aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/transport/transport.c')
-rw-r--r--src/core/lib/transport/transport.c64
1 files changed, 59 insertions, 5 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 857c3909d2..75aec7a5b4 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -32,18 +32,23 @@
*/
#include "src/core/lib/transport/transport.h"
+
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport_impl.h"
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
- gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type,
- refcount, refcount->destroy.cb_arg, val, val + 1, reason);
+ gpr_log(GPR_DEBUG, "%s %p:%p REF %" PRIdPTR "->%" PRIdPTR " %s",
+ refcount->object_type, refcount, refcount->destroy.cb_arg, val,
+ val + 1, reason);
#else
void grpc_stream_ref(grpc_stream_refcount *refcount) {
#endif
@@ -54,8 +59,9 @@ void grpc_stream_ref(grpc_stream_refcount *refcount) {
void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
- gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type,
- refcount, refcount->destroy.cb_arg, val, val - 1, reason);
+ gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s",
+ refcount->object_type, refcount, refcount->destroy.cb_arg, val,
+ val - 1, reason);
#else
void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
@@ -220,7 +226,7 @@ void grpc_transport_stream_op_add_cancellation_with_message(
error = GRPC_ERROR_CREATE("Call cancelled");
}
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
- add_error(op, &op->close_error, error);
+ add_error(op, &op->cancel_error, error);
}
void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
@@ -247,3 +253,51 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
add_error(op, &op->close_error, error);
}
+
+typedef struct {
+ grpc_closure outer_on_complete;
+ grpc_closure *inner_on_complete;
+ grpc_transport_op op;
+} made_transport_op;
+
+static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ made_transport_op *op = arg;
+ grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
+ NULL);
+ gpr_free(op);
+}
+
+grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
+ made_transport_op *op = gpr_malloc(sizeof(*op));
+ grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op);
+ op->inner_on_complete = on_complete;
+ memset(&op->op, 0, sizeof(op->op));
+ op->op.on_consumed = &op->outer_on_complete;
+ return &op->op;
+}
+
+typedef struct {
+ grpc_closure outer_on_complete;
+ grpc_closure *inner_on_complete;
+ grpc_transport_stream_op op;
+} made_transport_stream_op;
+
+static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ made_transport_stream_op *op = arg;
+ grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
+ NULL);
+ gpr_free(op);
+}
+
+grpc_transport_stream_op *grpc_make_transport_stream_op(
+ grpc_closure *on_complete) {
+ made_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op,
+ op);
+ op->inner_on_complete = on_complete;
+ memset(&op->op, 0, sizeof(op->op));
+ op->op.on_complete = &op->outer_on_complete;
+ return &op->op;
+}