aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/transport/transport.c')
-rw-r--r--src/core/lib/transport/transport.c47
1 files changed, 42 insertions, 5 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 004e748f25..d56cb31ee0 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -84,6 +84,39 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
}
}
+#define STREAM_REF_FROM_SLICE_REF(p) \
+ ((grpc_stream_refcount *)(((uint8_t *)p) - \
+ offsetof(grpc_stream_refcount, slice_refcount)))
+
+static void slice_stream_ref(void *p) {
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice");
+#else
+ grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p));
+#endif
+}
+
+static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) {
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice");
+#else
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p));
+#endif
+}
+
+grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
+ void *buffer, size_t length) {
+ slice_stream_ref(&refcount->slice_refcount);
+ return (grpc_slice){.refcount = &refcount->slice_refcount,
+ .data.refcounted = {.bytes = buffer, .length = length}};
+}
+
+static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {
+ .ref = slice_stream_ref,
+ .unref = slice_stream_unref,
+ .eq = grpc_slice_default_eq_impl,
+ .hash = grpc_slice_default_hash_impl};
+
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
grpc_iomgr_cb_func cb, void *cb_arg,
@@ -95,6 +128,8 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
#endif
gpr_ref_init(&refcount->refs, initial_refs);
grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
+ refcount->slice_refcount.vtable = &stream_ref_slice_vtable;
+ refcount->slice_refcount.sub_refcount = &refcount->slice_refcount;
}
static void move64(uint64_t *from, uint64_t *to) {
@@ -127,9 +162,9 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx,
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
return transport->vtable->init_stream(exec_ctx, transport, stream, refcount,
- server_data);
+ server_data, arena);
}
void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
@@ -162,9 +197,10 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream, void *and_free_memory) {
+ grpc_stream *stream,
+ grpc_closure *then_schedule_closure) {
transport->vtable->destroy_stream(exec_ctx, transport, stream,
- and_free_memory);
+ then_schedule_closure);
}
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
@@ -219,8 +255,9 @@ typedef struct {
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_stream_op *op = arg;
- grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
+ grpc_closure *c = op->inner_on_complete;
gpr_free(op);
+ grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error));
}
grpc_transport_stream_op *grpc_make_transport_stream_op(