diff options
author | Soheil Hassas Yeganeh <soheil@google.com> | 2018-11-02 15:20:02 -0400 |
---|---|---|
committer | Soheil Hassas Yeganeh <soheil@google.com> | 2018-11-05 10:12:39 -0500 |
commit | 48e4a81b05f2ad6541d72e819cd4f638055f13d5 (patch) | |
tree | 8d5b64b7113721afb2eb4a0363cbd3fd7e47ff41 /src/core/ext/transport/cronet/transport/cronet_transport.cc | |
parent | 5e6c4491bf60aa91bd3e4fed3c8203601a4c795e (diff) |
Remeve memset(0) from arena allocated memory.
Callers are updated to properly initialize the memory.
This behavior can be overridden using GRPC_ARENA_INIT_STRATEGY
environment variable.
Diffstat (limited to 'src/core/ext/transport/cronet/transport/cronet_transport.cc')
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.cc | 153 |
1 files changed, 74 insertions, 79 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 81e2634e3a..349d8681d5 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -111,16 +111,21 @@ typedef struct grpc_cronet_transport grpc_cronet_transport; /* TODO (makdharma): reorder structure for memory efficiency per http://www.catb.org/esr/structure-packing/#_structure_reordering: */ struct read_state { + read_state(gpr_arena* arena) + : trailing_metadata(arena), initial_metadata(arena) { + grpc_slice_buffer_init(&read_slice_buffer); + } + /* vars to store data coming from server */ - char* read_buffer; - bool length_field_received; - int received_bytes; - int remaining_bytes; - int length_field; - bool compressed; - char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; - char* payload_field; - bool read_stream_closed; + char* read_buffer = nullptr; + bool length_field_received = false; + int received_bytes = 0; + int remaining_bytes = 0; + int length_field = 0; + bool compressed = 0; + char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {}; + char* payload_field = nullptr; + bool read_stream_closed = 0; /* vars for holding data destined for the application */ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs; @@ -128,59 +133,71 @@ struct read_state { /* vars for trailing metadata */ grpc_chttp2_incoming_metadata_buffer trailing_metadata; - bool trailing_metadata_valid; + bool trailing_metadata_valid = false; /* vars for initial metadata */ grpc_chttp2_incoming_metadata_buffer initial_metadata; }; struct write_state { - char* write_buffer; + char* write_buffer = nullptr; }; /* track state of one stream op */ struct op_state { - bool state_op_done[OP_NUM_OPS]; - bool state_callback_received[OP_NUM_OPS]; + op_state(gpr_arena* arena) : rs(arena) {} + + bool state_op_done[OP_NUM_OPS] = {}; + bool state_callback_received[OP_NUM_OPS] = {}; /* A non-zero gRPC status code has been seen */ - bool fail_state; + bool fail_state = false; /* Transport is discarding all buffered messages */ - bool flush_read; - bool flush_cronet_when_ready; - bool pending_write_for_trailer; - bool pending_send_message; + bool flush_read = false; + bool flush_cronet_when_ready = false; + bool pending_write_for_trailer = false; + bool pending_send_message = false; /* User requested RECV_TRAILING_METADATA */ - bool pending_recv_trailing_metadata; + bool pending_recv_trailing_metadata = false; /* Cronet has not issued a callback of a bidirectional read */ - bool pending_read_from_cronet; - grpc_error* cancel_error; + bool pending_read_from_cronet = false; + grpc_error* cancel_error = GRPC_ERROR_NONE; /* data structure for storing data coming from server */ struct read_state rs; /* data structure for storing data going to the server */ struct write_state ws; }; +struct stream_obj; + struct op_and_state { + op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op); + grpc_transport_stream_op_batch op; struct op_state state; - bool done; - struct stream_obj* s; /* Pointer back to the stream object */ - struct op_and_state* next; /* next op_and_state in the linked list */ + bool done = false; + struct stream_obj* s; /* Pointer back to the stream object */ + /* next op_and_state in the linked list */ + struct op_and_state* next; }; struct op_storage { - int num_pending_ops; - struct op_and_state* head; + int num_pending_ops = 0; + struct op_and_state* head = nullptr; }; struct stream_obj { + stream_obj(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, gpr_arena* arena); + ~stream_obj(); + gpr_arena* arena; - struct op_and_state* oas; - grpc_transport_stream_op_batch* curr_op; + struct op_and_state* oas = nullptr; + grpc_transport_stream_op_batch* curr_op = nullptr; grpc_cronet_transport* curr_ct; grpc_stream* curr_gs; - bidirectional_stream* cbs; - bidirectional_stream_header_array header_array; + bidirectional_stream* cbs = nullptr; + bidirectional_stream_header_array header_array = + bidirectional_stream_header_array(); // Zero-initialize the structure. /* Stream level state. Some state will be tracked both at stream and stream_op * level */ @@ -195,7 +212,6 @@ struct stream_obj { /* Refcount object of the stream */ grpc_stream_refcount* refcount; }; -typedef struct stream_obj stream_obj; #ifndef NDEBUG #define GRPC_CRONET_STREAM_REF(stream, reason) \ @@ -306,6 +322,10 @@ static grpc_error* make_error_with_desc(int error_code, const char* desc) { return error; } +inline op_and_state::op_and_state(stream_obj* s, + const grpc_transport_stream_op_batch& op) + : op(op), state(s->arena), s(s), next(s->storage.head) {} + /* Add a new stream op to op storage. */ @@ -314,14 +334,8 @@ static void add_to_storage(struct stream_obj* s, struct op_storage* storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ - struct op_and_state* new_op = static_cast<struct op_and_state*>( - gpr_malloc(sizeof(struct op_and_state))); - memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch)); - memset(&new_op->state, 0, sizeof(new_op->state)); - new_op->s = s; - new_op->done = false; + op_and_state* new_op = grpc_core::New<op_and_state>(s, *op); gpr_mu_lock(&s->mu); - new_op->next = storage->head; storage->head = new_op; storage->num_pending_ops++; if (op->send_message) { @@ -347,7 +361,7 @@ static void remove_from_storage(struct stream_obj* s, } if (s->storage.head == oas) { s->storage.head = oas->next; - gpr_free(oas); + grpc_core::Delete(oas); s->storage.num_pending_ops--; CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas, s->storage.num_pending_ops); @@ -358,7 +372,7 @@ static void remove_from_storage(struct stream_obj* s, s->storage.num_pending_ops--; CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas, s->storage.num_pending_ops); - gpr_free(oas); + grpc_core::Delete(oas); break; } else if (GPR_UNLIKELY(curr->next == nullptr)) { CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free"); @@ -540,10 +554,6 @@ static void on_response_headers_received( } gpr_mu_lock(&s->mu); - memset(&s->state.rs.initial_metadata, 0, - sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, - s->arena); convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata); s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -634,11 +644,7 @@ static void on_response_trailers_received( stream_obj* s = static_cast<stream_obj*>(stream->annotation); grpc_cronet_transport* t = s->curr_ct; gpr_mu_lock(&s->mu); - memset(&s->state.rs.trailing_metadata, 0, - sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, - s->arena); convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata); if (trailers->count > 0) { s->state.rs.trailing_metadata_valid = true; @@ -1354,36 +1360,28 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { Functions used by upper layers to access transport functionality. */ +inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, gpr_arena* arena) + : arena(arena), + curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)), + curr_gs(gs), + state(arena), + refcount(refcount) { + GRPC_CRONET_STREAM_REF(this, "cronet transport"); + gpr_mu_init(&mu); +} + +inline stream_obj::~stream_obj() { + null_and_maybe_free_read_buffer(this); + /* Clean up read_slice_buffer in case there is unread data. */ + grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer); + GRPC_ERROR_UNREF(state.cancel_error); +} + static int init_stream(grpc_transport* gt, grpc_stream* gs, grpc_stream_refcount* refcount, const void* server_data, gpr_arena* arena) { - stream_obj* s = reinterpret_cast<stream_obj*>(gs); - - s->refcount = refcount; - GRPC_CRONET_STREAM_REF(s, "cronet transport"); - memset(&s->storage, 0, sizeof(s->storage)); - s->storage.head = nullptr; - memset(&s->state, 0, sizeof(s->state)); - s->curr_op = nullptr; - s->cbs = nullptr; - memset(&s->header_array, 0, sizeof(s->header_array)); - memset(&s->state.rs, 0, sizeof(s->state.rs)); - memset(&s->state.ws, 0, sizeof(s->state.ws)); - memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done)); - memset(s->state.state_callback_received, 0, - sizeof(s->state.state_callback_received)); - s->state.fail_state = s->state.flush_read = false; - s->state.cancel_error = nullptr; - s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; - s->state.pending_send_message = false; - s->state.pending_recv_trailing_metadata = false; - s->state.pending_read_from_cronet = false; - - s->curr_gs = gs; - s->curr_ct = reinterpret_cast<grpc_cronet_transport*>(gt); - s->arena = arena; - - gpr_mu_init(&s->mu); + new (gs) stream_obj(gt, gs, refcount, arena); return 0; } @@ -1426,10 +1424,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { stream_obj* s = reinterpret_cast<stream_obj*>(gs); - null_and_maybe_free_read_buffer(s); - /* Clean up read_slice_buffer in case there is unread data. */ - grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer); - GRPC_ERROR_UNREF(s->state.cancel_error); + s->~stream_obj(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } |