diff options
author | 2018-02-15 10:45:13 -0800 | |
---|---|---|
committer | 2018-02-15 10:45:13 -0800 | |
commit | 782fdc745363bf415e447004498f7df90a5a2894 (patch) | |
tree | fe559b21dd6ef141bf1a859e7ad24fae23213be4 /src/core/ext/transport | |
parent | ddd50b9a93c4c4df28e72e4986b655c23a057ebf (diff) | |
parent | d50b19188f654d2e4aae1f7cf6a7f6a09961eccb (diff) |
Merge pull request #14416 from markdroth/send_message_slice_ownership
Take ownership of byte_buffer contents as soon as send_message op is started.
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 6 | ||||
-rw-r--r-- | src/core/ext/transport/inproc/inproc_transport.cc | 18 |
2 files changed, 23 insertions, 1 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index ad8da94cb3..2fc3c4fa41 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1473,6 +1473,7 @@ static void perform_stream_op_locked(void* stream_op, // streaming call might send another message before getting a // recv_message failure, breaking out of its loop, and then // starting recv_trailing_metadata. + grpc_byte_stream_destroy(op->payload->send_message.send_message); grpc_chttp2_complete_closure_step( t, s, &s->fetching_send_message_finished, t->is_client && s->received_trailing_metadata @@ -2092,7 +2093,10 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); - s->fetching_send_message = nullptr; + if (s->fetching_send_message != nullptr) { + grpc_byte_stream_destroy(s->fetching_send_message); + s->fetching_send_message = nullptr; + } grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 2022eaffe5..e1d4843785 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -480,6 +480,8 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { s->recv_message_op = nullptr; } if (s->send_message_op) { + grpc_byte_stream_destroy( + s->send_message_op->payload->send_message.send_message); complete_if_batch_end_locked( s, error, s->send_message_op, "fail_helper scheduling send-message-on-complete"); @@ -506,6 +508,14 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { GRPC_ERROR_UNREF(error); } +// TODO(vjpai): It should not be necessary to drain the incoming byte +// stream and create a new one; instead, we should simply pass the byte +// stream from the sender directly to the receiver as-is. +// +// Note that fixing this will also avoid the assumption in this code +// that the incoming byte stream's next() call will always return +// synchronously. That assumption is true today but may not always be +// true in the future. static void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { size_t remaining = @@ -532,6 +542,8 @@ static void message_transfer_locked(inproc_stream* sender, remaining -= GRPC_SLICE_LENGTH(message_slice); grpc_slice_buffer_add(&receiver->recv_message, message_slice); } while (remaining > 0); + grpc_byte_stream_destroy( + sender->send_message_op->payload->send_message.send_message); grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, 0); @@ -592,6 +604,8 @@ static void op_state_machine(void* arg, grpc_error* error) { (s->trailing_md_sent || other->recv_trailing_md_op)) { // A server send will never be matched if the client is waiting // for trailing metadata already + grpc_byte_stream_destroy( + s->send_message_op->payload->send_message.send_message); complete_if_batch_end_locked( s, GRPC_ERROR_NONE, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); @@ -728,6 +742,8 @@ static void op_state_machine(void* arg, grpc_error* error) { if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op + grpc_byte_stream_destroy( + s->send_message_op->payload->send_message.send_message); complete_if_batch_end_locked( s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); @@ -785,6 +801,8 @@ static void op_state_machine(void* arg, grpc_error* error) { s->send_message_op) { // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op + grpc_byte_stream_destroy( + s->send_message_op->payload->send_message.send_message); complete_if_batch_end_locked( s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); |