aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/transport/transport.c')
-rw-r--r--src/core/lib/transport/transport.c43
1 files changed, 22 insertions, 21 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index b448126da8..055edbb39f 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -68,7 +69,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
- grpc_exec_ctx_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}
}
@@ -82,7 +83,7 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
grpc_iomgr_cb_func cb, void *cb_arg) {
#endif
gpr_ref_init(&refcount->refs, initial_refs);
- grpc_closure_init(&refcount->destroy, cb, cb_arg);
+ grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
}
static void move64(uint64_t *from, uint64_t *to) {
@@ -168,11 +169,10 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error),
- NULL);
- grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready,
- GRPC_ERROR_REF(error), NULL);
- grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL);
+ grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error));
+ grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
+ GRPC_ERROR_REF(error));
+ grpc_closure_sched(exec_ctx, op->on_complete, error);
}
typedef struct {
@@ -196,7 +196,8 @@ static void add_error(grpc_transport_stream_op *op, grpc_error **which,
cmd = gpr_malloc(sizeof(*cmd));
cmd->error = error;
cmd->then_call = op->on_complete;
- grpc_closure_init(&cmd->closure, free_message, cmd);
+ grpc_closure_init(&cmd->closure, free_message, cmd,
+ grpc_schedule_on_exec_ctx);
op->on_complete = &cmd->closure;
*which = error;
}
@@ -212,12 +213,12 @@ void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
}
void grpc_transport_stream_op_add_cancellation_with_message(
- grpc_transport_stream_op *op, grpc_status_code status,
- grpc_slice *optional_message) {
+ grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op,
+ grpc_status_code status, grpc_slice *optional_message) {
GPR_ASSERT(status != GRPC_STATUS_OK);
if (op->cancel_error != GRPC_ERROR_NONE) {
if (optional_message) {
- grpc_slice_unref(*optional_message);
+ grpc_slice_unref_internal(exec_ctx, *optional_message);
}
return;
}
@@ -227,7 +228,7 @@ void grpc_transport_stream_op_add_cancellation_with_message(
error = grpc_error_set_str(GRPC_ERROR_CREATE(msg),
GRPC_ERROR_STR_GRPC_MESSAGE, msg);
gpr_free(msg);
- grpc_slice_unref(*optional_message);
+ grpc_slice_unref_internal(exec_ctx, *optional_message);
} else {
error = GRPC_ERROR_CREATE("Call cancelled");
}
@@ -235,14 +236,15 @@ void grpc_transport_stream_op_add_cancellation_with_message(
add_error(op, &op->cancel_error, error);
}
-void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
+void grpc_transport_stream_op_add_close(grpc_exec_ctx *exec_ctx,
+ grpc_transport_stream_op *op,
grpc_status_code status,
grpc_slice *optional_message) {
GPR_ASSERT(status != GRPC_STATUS_OK);
if (op->cancel_error != GRPC_ERROR_NONE ||
op->close_error != GRPC_ERROR_NONE) {
if (optional_message) {
- grpc_slice_unref(*optional_message);
+ grpc_slice_unref_internal(exec_ctx, *optional_message);
}
return;
}
@@ -252,7 +254,7 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
error = grpc_error_set_str(GRPC_ERROR_CREATE(msg),
GRPC_ERROR_STR_GRPC_MESSAGE, msg);
gpr_free(msg);
- grpc_slice_unref(*optional_message);
+ grpc_slice_unref_internal(exec_ctx, *optional_message);
} else {
error = GRPC_ERROR_CREATE("Call force closed");
}
@@ -269,14 +271,14 @@ typedef struct {
static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_op *op = arg;
- grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
gpr_free(op);
}
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
made_transport_op *op = gpr_malloc(sizeof(*op));
- grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op);
+ grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op,
+ grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
op->op.on_consumed = &op->outer_on_complete;
@@ -292,8 +294,7 @@ typedef struct {
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_stream_op *op = arg;
- grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
gpr_free(op);
}
@@ -301,7 +302,7 @@ grpc_transport_stream_op *grpc_make_transport_stream_op(
grpc_closure *on_complete) {
made_transport_stream_op *op = gpr_malloc(sizeof(*op));
grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op,
- op);
+ op, grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
op->op.on_complete = &op->outer_on_complete;