aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-07-11 15:56:08 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-07-11 15:56:08 -0700
commite0221ff3401d397b981fbb3e057fa7812de37f35 (patch)
treec2db05a6d49df62bae6a5fc060f60ba5a1daa882 /src/core
parent1fccf89404f535573724ef32e223986e2eb96f44 (diff)
Debugging
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_config/subchannel.c17
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c2
-rw-r--r--src/core/lib/iomgr/combiner.c61
-rw-r--r--src/core/lib/iomgr/combiner.h11
-rw-r--r--src/core/lib/iomgr/exec_ctx.h6
-rw-r--r--src/core/lib/surface/channel.c7
-rw-r--r--src/core/lib/surface/channel_ping.c9
-rw-r--r--src/core/lib/surface/server.c56
-rw-r--r--src/core/lib/transport/transport.c26
-rw-r--r--src/core/lib/transport/transport.h4
10 files changed, 132 insertions, 67 deletions
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index d089cd4399..2318bd1023 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -504,14 +504,14 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *closure) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
memset(&op, 0, sizeof(op));
- op.connectivity_state = state;
- op.on_connectivity_state_change = closure;
- op.bind_pollset_set = interested_parties;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
void grpc_connected_subchannel_notify_on_state_change(
@@ -525,12 +525,11 @@ void grpc_connected_subchannel_notify_on_state_change(
void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con,
grpc_closure *closure) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.send_ping = closure;
+ op->send_ping = closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 194264c8f9..4dc642e37a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -181,7 +181,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- grpc_combiner_destroy(t->executor.combiner);
+ grpc_combiner_destroy(exec_ctx, t->executor.combiner);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 5dfef4f25b..99e112308b 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -32,6 +32,7 @@
*/
#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/workqueue.h"
#include <string.h>
@@ -52,7 +53,9 @@ struct grpc_combiner {
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
- lock->optional_workqueue = optional_workqueue;
+ lock->optional_workqueue =
+ optional_workqueue ? GRPC_WORKQUEUE_REF(optional_workqueue, "combiner")
+ : NULL;
gpr_atm_no_barrier_store(&lock->state, 1);
gpr_mpscq_init(&lock->queue);
lock->take_async_break_before_final_list = false;
@@ -60,15 +63,18 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
return lock;
}
-static void really_destroy(grpc_combiner *lock) {
+static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
gpr_mpscq_destroy(&lock->queue);
+ if (lock->optional_workqueue != NULL) {
+ GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner");
+ }
gpr_free(lock);
}
-void grpc_combiner_destroy(grpc_combiner *lock) {
+void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) {
- really_destroy(lock);
+ really_destroy(exec_ctx, lock);
}
}
@@ -77,7 +83,12 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- if (maybe_finish_one(exec_ctx, arg)) finish(exec_ctx, arg);
+ grpc_combiner *lock = arg;
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ exec_ctx->active_combiner = lock;
+ if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
+ exec_ctx->active_combiner = NULL;
}
static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
@@ -96,6 +107,8 @@ static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_combiner *lock = arg;
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ exec_ctx->active_combiner = lock;
// quick peek to see if new things have turned up on the queue: if so, go back
// to executing them before the final list
if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) {
@@ -104,9 +117,12 @@ static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
execute_final(exec_ctx, lock);
finish(exec_ctx, lock);
}
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
+ exec_ctx->active_combiner = NULL;
}
static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
if (lock->take_async_break_before_final_list) {
grpc_closure_init(&lock->continue_finishing, continue_executing_final,
lock);
@@ -121,6 +137,7 @@ static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
if (n == NULL) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
@@ -151,7 +168,7 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
case 3: // had one count, one unorphaned --> unlocked unorphaned
return;
case 2: // and one count, one orphaned --> unlocked and orphaned
- really_destroy(lock);
+ really_destroy(exec_ctx, lock);
return;
case 1:
case 0:
@@ -166,19 +183,43 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error) {
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & 1); // ensure lock has not been destroyed
- if (last == 1) {
- cl->cb(exec_ctx, cl->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- finish(exec_ctx, lock);
+ if (exec_ctx->active_combiner == NULL) {
+ if (last == 1) {
+ exec_ctx->active_combiner = lock;
+ cl->cb(exec_ctx, cl->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ finish(exec_ctx, lock);
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
+ exec_ctx->active_combiner = NULL;
+ } else {
+ cl->error = error;
+ gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ }
} else {
cl->error = error;
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
+ lock);
+ grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
+ lock->optional_workqueue);
}
}
+static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
+ grpc_error *error) {
+ grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
+ GRPC_ERROR_REF(error), true);
+}
+
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
bool force_async_break) {
+ if (exec_ctx->active_combiner != lock) {
+ grpc_combiner_execute(exec_ctx, lock,
+ grpc_closure_create(enqueue_finally, closure), error);
+ return;
+ }
+
if (force_async_break) {
lock->take_async_break_before_final_list = true;
}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 3554202aae..5b94d5bd99 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -40,8 +40,6 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/mpscq.h"
-typedef struct grpc_combiner grpc_combiner;
-
// Provides serialized access to some resource.
// Each action queued on an aelock is executed serially in a borrowed thread.
// The actual thread executing actions may change over time (but there will only
@@ -51,18 +49,19 @@ typedef struct grpc_combiner grpc_combiner;
// necessary
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Destroy the lock
-void grpc_combiner_destroy(grpc_combiner *lock);
+void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
// Execute \a action within the lock.
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error);
// Execute \a action within the lock just prior to unlocking.
-// if \a force_async_break is additionally set, the combiner is forced to trip
+// if \a hint_async_break is additionally set, the combiner is tries to trip
// through the workqueue between finishing the primary queue of combined
// closures and executing the finally list.
-// Can only be called from within a closure scheduled by grpc_combiner_execute
+// Takes a very slow and round-about path if not called from a
+// grpc_combiner_execute closure
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
- bool force_async_break);
+ bool hint_async_break);
void grpc_combiner_force_async_finally(grpc_combiner *lock);
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 917f332f03..1895ee6245 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -40,8 +40,8 @@
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
-struct grpc_workqueue;
typedef struct grpc_workqueue grpc_workqueue;
+typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
/** Execution context.
@@ -66,13 +66,15 @@ typedef struct grpc_workqueue grpc_workqueue;
*/
struct grpc_exec_ctx {
grpc_closure_list closure_list;
+ /** currently active combiner: updated only via combiner.c */
+ grpc_combiner *active_combiner;
bool cached_ready_to_finish;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
- { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check }
+ { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
bool cached_ready_to_finish;
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 6d2b1c4935..52e78567bd 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -334,14 +334,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
}
void grpc_channel_destroy(grpc_channel *channel) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
- memset(&op, 0, sizeof(op));
- op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
+ op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->start_transport_op(&exec_ctx, elem, &op);
+ elem->filter->start_transport_op(&exec_ctx, elem, op);
GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel");
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 9818f9d2f2..5f9f07f89a 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -61,19 +61,18 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
void *tag, void *reserved) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
ping_result *pr = gpr_malloc(sizeof(*pr));
grpc_channel_element *top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(reserved == NULL);
- memset(&op, 0, sizeof(op));
pr->tag = tag;
pr->cq = cq;
grpc_closure_init(&pr->closure, ping_done, pr);
- op.send_ping = &pr->closure;
- op.bind_pollset = grpc_cq_pollset(cq);
+ op->send_ping = &pr->closure;
+ op->bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
- top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
+ top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index def6e5068b..fc9987f0aa 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -272,22 +272,20 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
int send_goaway, grpc_error *send_disconnect) {
- grpc_transport_op op;
- struct shutdown_cleanup_args *sc;
+ struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
+ grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
+ grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.send_goaway = send_goaway;
- sc = gpr_malloc(sizeof(*sc));
+ op->send_goaway = send_goaway;
sc->slice = gpr_slice_from_copied_string("Server shutdown");
- op.goaway_message = &sc->slice;
- op.goaway_status = GRPC_STATUS_OK;
- op.disconnect_with_error = send_disconnect;
- grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
- op.on_consumed = &sc->closure;
+ op->goaway_message = &sc->slice;
+ op->goaway_status = GRPC_STATUS_OK;
+ op->disconnect_with_error = send_disconnect;
+ op->on_consumed = &sc->closure;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
@@ -434,14 +432,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_transport_op op;
- memset(&op, 0, sizeof(op));
- op.set_accept_stream = true;
- op.on_consumed = &chand->finish_destroy_channel_closure;
+ grpc_transport_op *op =
+ grpc_make_transport_op(&chand->finish_destroy_channel_closure);
+ op->set_accept_stream = true;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- &op);
+ op);
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
@@ -832,14 +829,13 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_transport_op op;
- memset(&op, 0, sizeof(op));
- op.on_connectivity_state_change = &chand->channel_connectivity_changed,
- op.connectivity_state = &chand->connectivity_state;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op->connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- &op);
+ op);
} else {
gpr_mu_lock(&server->mu_global);
destroy_channel(exec_ctx, chand);
@@ -1101,7 +1097,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
size_t slots;
uint32_t probes;
uint32_t max_probes = 0;
- grpc_transport_op op;
+ grpc_transport_op *op = NULL;
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
@@ -1161,16 +1157,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
gpr_mu_unlock(&s->mu_global);
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
- memset(&op, 0, sizeof(op));
- op.set_accept_stream = true;
- op.set_accept_stream_fn = accept_stream;
- op.set_accept_stream_user_data = chand;
- op.on_connectivity_state_change = &chand->channel_connectivity_changed;
- op.connectivity_state = &chand->connectivity_state;
+ op = grpc_make_transport_op(NULL);
+ op->set_accept_stream = true;
+ op->set_accept_stream_fn = accept_stream;
+ op->set_accept_stream_user_data = chand;
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
- op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
+ op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
}
- grpc_transport_perform_op(exec_ctx, transport, &op);
+ grpc_transport_perform_op(exec_ctx, transport, op);
}
void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 857c3909d2..970d9c389a 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -32,10 +32,14 @@
*/
#include "src/core/lib/transport/transport.h"
+
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -247,3 +251,25 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
add_error(op, &op->close_error, error);
}
+
+typedef struct {
+ grpc_closure outer_on_complete;
+ grpc_closure *inner_on_complete;
+ grpc_transport_op op;
+} made_transport_op;
+
+static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ made_transport_op *op = arg;
+ grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
+ NULL);
+ gpr_free(op);
+}
+
+grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
+ made_transport_op *op = gpr_malloc(sizeof(*op));
+ grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op);
+ op->inner_on_complete = on_complete;
+ memset(&op->op, 0, sizeof(op->op));
+ return &op->op;
+}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index b04b60ec3d..508b57a5b4 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -285,4 +285,8 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
+/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
+ \a on_consumed and then delete the returned transport op */
+grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed);
+
#endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */