aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/cronet/transport/cronet_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/cronet/transport/cronet_transport.cc')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc155
1 files changed, 75 insertions, 80 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 4a252d972d..349d8681d5 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -111,16 +111,21 @@ typedef struct grpc_cronet_transport grpc_cronet_transport;
/* TODO (makdharma): reorder structure for memory efficiency per
http://www.catb.org/esr/structure-packing/#_structure_reordering: */
struct read_state {
+ read_state(gpr_arena* arena)
+ : trailing_metadata(arena), initial_metadata(arena) {
+ grpc_slice_buffer_init(&read_slice_buffer);
+ }
+
/* vars to store data coming from server */
- char* read_buffer;
- bool length_field_received;
- int received_bytes;
- int remaining_bytes;
- int length_field;
- bool compressed;
- char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
- char* payload_field;
- bool read_stream_closed;
+ char* read_buffer = nullptr;
+ bool length_field_received = false;
+ int received_bytes = 0;
+ int remaining_bytes = 0;
+ int length_field = 0;
+ bool compressed = 0;
+ char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
+ char* payload_field = nullptr;
+ bool read_stream_closed = 0;
/* vars for holding data destined for the application */
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
@@ -128,59 +133,71 @@ struct read_state {
/* vars for trailing metadata */
grpc_chttp2_incoming_metadata_buffer trailing_metadata;
- bool trailing_metadata_valid;
+ bool trailing_metadata_valid = false;
/* vars for initial metadata */
grpc_chttp2_incoming_metadata_buffer initial_metadata;
};
struct write_state {
- char* write_buffer;
+ char* write_buffer = nullptr;
};
/* track state of one stream op */
struct op_state {
- bool state_op_done[OP_NUM_OPS];
- bool state_callback_received[OP_NUM_OPS];
+ op_state(gpr_arena* arena) : rs(arena) {}
+
+ bool state_op_done[OP_NUM_OPS] = {};
+ bool state_callback_received[OP_NUM_OPS] = {};
/* A non-zero gRPC status code has been seen */
- bool fail_state;
+ bool fail_state = false;
/* Transport is discarding all buffered messages */
- bool flush_read;
- bool flush_cronet_when_ready;
- bool pending_write_for_trailer;
- bool pending_send_message;
+ bool flush_read = false;
+ bool flush_cronet_when_ready = false;
+ bool pending_write_for_trailer = false;
+ bool pending_send_message = false;
/* User requested RECV_TRAILING_METADATA */
- bool pending_recv_trailing_metadata;
+ bool pending_recv_trailing_metadata = false;
/* Cronet has not issued a callback of a bidirectional read */
- bool pending_read_from_cronet;
- grpc_error* cancel_error;
+ bool pending_read_from_cronet = false;
+ grpc_error* cancel_error = GRPC_ERROR_NONE;
/* data structure for storing data coming from server */
struct read_state rs;
/* data structure for storing data going to the server */
struct write_state ws;
};
+struct stream_obj;
+
struct op_and_state {
+ op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
+
grpc_transport_stream_op_batch op;
struct op_state state;
- bool done;
- struct stream_obj* s; /* Pointer back to the stream object */
- struct op_and_state* next; /* next op_and_state in the linked list */
+ bool done = false;
+ struct stream_obj* s; /* Pointer back to the stream object */
+ /* next op_and_state in the linked list */
+ struct op_and_state* next;
};
struct op_storage {
- int num_pending_ops;
- struct op_and_state* head;
+ int num_pending_ops = 0;
+ struct op_and_state* head = nullptr;
};
struct stream_obj {
+ stream_obj(grpc_transport* gt, grpc_stream* gs,
+ grpc_stream_refcount* refcount, gpr_arena* arena);
+ ~stream_obj();
+
gpr_arena* arena;
- struct op_and_state* oas;
- grpc_transport_stream_op_batch* curr_op;
+ struct op_and_state* oas = nullptr;
+ grpc_transport_stream_op_batch* curr_op = nullptr;
grpc_cronet_transport* curr_ct;
grpc_stream* curr_gs;
- bidirectional_stream* cbs;
- bidirectional_stream_header_array header_array;
+ bidirectional_stream* cbs = nullptr;
+ bidirectional_stream_header_array header_array =
+ bidirectional_stream_header_array(); // Zero-initialize the structure.
/* Stream level state. Some state will be tracked both at stream and stream_op
* level */
@@ -195,7 +212,6 @@ struct stream_obj {
/* Refcount object of the stream */
grpc_stream_refcount* refcount;
};
-typedef struct stream_obj stream_obj;
#ifndef NDEBUG
#define GRPC_CRONET_STREAM_REF(stream, reason) \
@@ -306,6 +322,10 @@ static grpc_error* make_error_with_desc(int error_code, const char* desc) {
return error;
}
+inline op_and_state::op_and_state(stream_obj* s,
+ const grpc_transport_stream_op_batch& op)
+ : op(op), state(s->arena), s(s), next(s->storage.head) {}
+
/*
Add a new stream op to op storage.
*/
@@ -314,14 +334,8 @@ static void add_to_storage(struct stream_obj* s,
struct op_storage* storage = &s->storage;
/* add new op at the beginning of the linked list. The memory is freed
in remove_from_storage */
- struct op_and_state* new_op = static_cast<struct op_and_state*>(
- gpr_malloc(sizeof(struct op_and_state)));
- memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch));
- memset(&new_op->state, 0, sizeof(new_op->state));
- new_op->s = s;
- new_op->done = false;
+ op_and_state* new_op = grpc_core::New<op_and_state>(s, *op);
gpr_mu_lock(&s->mu);
- new_op->next = storage->head;
storage->head = new_op;
storage->num_pending_ops++;
if (op->send_message) {
@@ -347,7 +361,7 @@ static void remove_from_storage(struct stream_obj* s,
}
if (s->storage.head == oas) {
s->storage.head = oas->next;
- gpr_free(oas);
+ grpc_core::Delete(oas);
s->storage.num_pending_ops--;
CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
s->storage.num_pending_ops);
@@ -358,7 +372,7 @@ static void remove_from_storage(struct stream_obj* s,
s->storage.num_pending_ops--;
CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
s->storage.num_pending_ops);
- gpr_free(oas);
+ grpc_core::Delete(oas);
break;
} else if (GPR_UNLIKELY(curr->next == nullptr)) {
CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
@@ -540,10 +554,6 @@ static void on_response_headers_received(
}
gpr_mu_lock(&s->mu);
- memset(&s->state.rs.initial_metadata, 0,
- sizeof(s->state.rs.initial_metadata));
- grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata,
- s->arena);
convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
@@ -634,11 +644,7 @@ static void on_response_trailers_received(
stream_obj* s = static_cast<stream_obj*>(stream->annotation);
grpc_cronet_transport* t = s->curr_ct;
gpr_mu_lock(&s->mu);
- memset(&s->state.rs.trailing_metadata, 0,
- sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false;
- grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata,
- s->arena);
convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
if (trailers->count > 0) {
s->state.rs.trailing_metadata_valid = true;
@@ -1287,7 +1293,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_error* error = GRPC_ERROR_NONE;
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
error = GRPC_ERROR_REF(stream_state->cancel_error);
- } else if (stream_state->state_op_done[OP_FAILED]) {
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
} else if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1354,36 +1360,28 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
Functions used by upper layers to access transport functionality.
*/
+inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
+ grpc_stream_refcount* refcount, gpr_arena* arena)
+ : arena(arena),
+ curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
+ curr_gs(gs),
+ state(arena),
+ refcount(refcount) {
+ GRPC_CRONET_STREAM_REF(this, "cronet transport");
+ gpr_mu_init(&mu);
+}
+
+inline stream_obj::~stream_obj() {
+ null_and_maybe_free_read_buffer(this);
+ /* Clean up read_slice_buffer in case there is unread data. */
+ grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
+ GRPC_ERROR_UNREF(state.cancel_error);
+}
+
static int init_stream(grpc_transport* gt, grpc_stream* gs,
grpc_stream_refcount* refcount, const void* server_data,
gpr_arena* arena) {
- stream_obj* s = reinterpret_cast<stream_obj*>(gs);
-
- s->refcount = refcount;
- GRPC_CRONET_STREAM_REF(s, "cronet transport");
- memset(&s->storage, 0, sizeof(s->storage));
- s->storage.head = nullptr;
- memset(&s->state, 0, sizeof(s->state));
- s->curr_op = nullptr;
- s->cbs = nullptr;
- memset(&s->header_array, 0, sizeof(s->header_array));
- memset(&s->state.rs, 0, sizeof(s->state.rs));
- memset(&s->state.ws, 0, sizeof(s->state.ws));
- memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
- memset(s->state.state_callback_received, 0,
- sizeof(s->state.state_callback_received));
- s->state.fail_state = s->state.flush_read = false;
- s->state.cancel_error = nullptr;
- s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false;
- s->state.pending_send_message = false;
- s->state.pending_recv_trailing_metadata = false;
- s->state.pending_read_from_cronet = false;
-
- s->curr_gs = gs;
- s->curr_ct = reinterpret_cast<grpc_cronet_transport*>(gt);
- s->arena = arena;
-
- gpr_mu_init(&s->mu);
+ new (gs) stream_obj(gt, gs, refcount, arena);
return 0;
}
@@ -1426,10 +1424,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_closure* then_schedule_closure) {
stream_obj* s = reinterpret_cast<stream_obj*>(gs);
- null_and_maybe_free_read_buffer(s);
- /* Clean up read_slice_buffer in case there is unread data. */
- grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer);
- GRPC_ERROR_UNREF(s->state.cancel_error);
+ s->~stream_obj();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}