aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-09-07 14:43:47 -0700
committerGravatar GitHub <noreply@github.com>2017-09-07 14:43:47 -0700
commit8941f607d682d49f0b7c901bffd3e52941e4c5f9 (patch)
tree3b8d2b29e0abef75ce1d4de983b9c6dfb183d24f /src/core/lib/transport
parentb13c8cb03f863cbc207d7e6e17c966e6a4ef5350 (diff)
parentd4ac3a156feb36b1e26cef7851dfc8e7923e5f2a (diff)
Merge pull request #12369 from markdroth/call_combiner
Second attempt at call combiner PR
Diffstat (limited to 'src/core/lib/transport')
-rw-r--r--src/core/lib/transport/transport.c21
-rw-r--r--src/core/lib/transport/transport.h13
-rw-r--r--src/core/lib/transport/transport_impl.h3
-rw-r--r--src/core/lib/transport/transport_op_string.c7
4 files changed, 23 insertions, 21 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 6c61f4b8d9..650b0559aa 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -197,11 +197,6 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
then_schedule_closure);
}
-char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_transport *transport) {
- return transport->vtable->get_peer(exec_ctx, transport);
-}
-
grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *transport) {
return transport->vtable->get_endpoint(exec_ctx, transport);
@@ -214,24 +209,24 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
// is a function that must always unref cancel_error
// though it lives in lib, it handles transport stream ops sure
// it's grpc_transport_stream_op_batch_finish_with_failure
-
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
- grpc_error *error) {
+ grpc_error *error, grpc_call_combiner *call_combiner) {
if (batch->send_message) {
grpc_byte_stream_destroy(exec_ctx,
batch->payload->send_message.send_message);
}
if (batch->recv_message) {
- GRPC_CLOSURE_SCHED(exec_ctx,
- batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error));
+ GRPC_CALL_COMBINER_START(exec_ctx, call_combiner,
+ batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error),
+ "failing recv_message_ready");
}
if (batch->recv_initial_metadata) {
- GRPC_CLOSURE_SCHED(
- exec_ctx,
+ GRPC_CALL_COMBINER_START(
+ exec_ctx, call_combiner,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_REF(error));
+ GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
if (batch->cancel_stream) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 099138ea14..fbf5dcb8b5 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -22,6 +22,7 @@
#include <stddef.h>
#include "src/core/lib/channel/context.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
@@ -152,6 +153,9 @@ struct grpc_transport_stream_op_batch_payload {
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags;
+ // If non-NULL, will be set by the transport to the peer string
+ // (a char*, which the caller takes ownership of).
+ gpr_atm *peer_string;
} send_initial_metadata;
struct {
@@ -176,6 +180,9 @@ struct grpc_transport_stream_op_batch_payload {
// immediately available. This may be a signal that we received a
// Trailers-Only response.
bool *trailing_metadata_available;
+ // If non-NULL, will be set by the transport to the peer string
+ // (a char*, which the caller takes ownership of).
+ gpr_atm *peer_string;
} recv_initial_metadata;
struct {
@@ -293,7 +300,7 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
- grpc_error *error);
+ grpc_error *error, grpc_call_combiner *call_combiner);
char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op);
char *grpc_transport_op_string(grpc_transport_op *op);
@@ -332,10 +339,6 @@ void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
-/* Get the transports peer */
-char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_transport *transport);
-
/* Get the endpoint used by \a transport */
grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index fc772c6dd1..bbae69c223 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -59,9 +59,6 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
- /* implementation of grpc_transport_get_peer */
- char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
-
/* implementation of grpc_transport_get_endpoint */
grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
} grpc_transport_vtable;
diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c
index 7b18229ba6..409a6c4103 100644
--- a/src/core/lib/transport/transport_op_string.c
+++ b/src/core/lib/transport/transport_op_string.c
@@ -112,6 +112,13 @@ char *grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
+ if (op->collect_stats) {
+ gpr_strvec_add(&b, gpr_strdup(" "));
+ gpr_asprintf(&tmp, "COLLECT_STATS:%p",
+ op->payload->collect_stats.collect_stats);
+ gpr_strvec_add(&b, tmp);
+ }
+
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);