aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/transport/transport.c')
-rw-r--r--src/core/lib/transport/transport.c34
1 files changed, 22 insertions, 12 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index d56cb31ee0..82c4e004b7 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -170,7 +170,7 @@ int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
grpc_stream *stream,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
transport->vtable->perform_stream_op(exec_ctx, transport, stream, op);
}
@@ -213,14 +213,23 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
return transport->vtable->get_endpoint(exec_ctx, transport);
}
-void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
- grpc_transport_stream_op *op,
- grpc_error *error) {
- grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error));
- grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
- GRPC_ERROR_REF(error));
+void grpc_transport_stream_op_batch_finish_with_failure(
+ grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
+ grpc_error *error) {
+ if (op->recv_message) {
+ grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error));
+ }
+ if (op->recv_initial_metadata) {
+ grpc_closure_sched(
+ exec_ctx,
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_REF(error));
+ }
grpc_closure_sched(exec_ctx, op->on_complete, error);
- GRPC_ERROR_UNREF(op->cancel_error);
+ if (op->cancel_stream) {
+ GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error);
+ }
}
typedef struct {
@@ -249,7 +258,8 @@ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
typedef struct {
grpc_closure outer_on_complete;
grpc_closure *inner_on_complete;
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
+ grpc_transport_stream_op_batch_payload payload;
} made_transport_stream_op;
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
@@ -260,13 +270,13 @@ static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error));
}
-grpc_transport_stream_op *grpc_make_transport_stream_op(
+grpc_transport_stream_op_batch *grpc_make_transport_stream_op(
grpc_closure *on_complete) {
- made_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ made_transport_stream_op *op = gpr_zalloc(sizeof(*op));
+ op->op.payload = &op->payload;
grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op,
op, grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
- memset(&op->op, 0, sizeof(op->op));
op->op.on_complete = &op->outer_on_complete;
return &op->op;
}