diff options
Diffstat (limited to 'src/core/lib/transport/transport.c')
-rw-r--r-- | src/core/lib/transport/transport.c | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index d56cb31ee0..82c4e004b7 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -170,7 +170,7 @@ int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { transport->vtable->perform_stream_op(exec_ctx, transport, stream, op); } @@ -213,14 +213,23 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, return transport->vtable->get_endpoint(exec_ctx, transport); } -void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_error *error) { - grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error)); - grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); +void grpc_transport_stream_op_batch_finish_with_failure( + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_error *error) { + if (op->recv_message) { + grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error)); + } + if (op->recv_initial_metadata) { + grpc_closure_sched( + exec_ctx, + op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); + } grpc_closure_sched(exec_ctx, op->on_complete, error); - GRPC_ERROR_UNREF(op->cancel_error); + if (op->cancel_stream) { + GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error); + } } typedef struct { @@ -249,7 +258,8 @@ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { typedef struct { grpc_closure outer_on_complete; grpc_closure *inner_on_complete; - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload payload; } made_transport_stream_op; static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, @@ -260,13 +270,13 @@ static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error)); } -grpc_transport_stream_op *grpc_make_transport_stream_op( +grpc_transport_stream_op_batch *grpc_make_transport_stream_op( grpc_closure *on_complete) { - made_transport_stream_op *op = gpr_malloc(sizeof(*op)); + made_transport_stream_op *op = gpr_zalloc(sizeof(*op)); + op->op.payload = &op->payload; grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op, op, grpc_schedule_on_exec_ctx); op->inner_on_complete = on_complete; - memset(&op->op, 0, sizeof(op->op)); op->op.on_complete = &op->outer_on_complete; return &op->op; } |