diff options
Diffstat (limited to 'src/core/ext/transport/cronet/transport/cronet_transport.cc')
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.cc | 317 |
1 files changed, 158 insertions, 159 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 97e4f7d72b..0b1ddf8839 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -75,17 +75,17 @@ enum e_op_id { /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ -static void on_stream_ready(bidirectional_stream *); +static void on_stream_ready(bidirectional_stream*); static void on_response_headers_received( - bidirectional_stream *, const bidirectional_stream_header_array *, - const char *); -static void on_write_completed(bidirectional_stream *, const char *); -static void on_read_completed(bidirectional_stream *, char *, int); + bidirectional_stream*, const bidirectional_stream_header_array*, + const char*); +static void on_write_completed(bidirectional_stream*, const char*); +static void on_read_completed(bidirectional_stream*, char*, int); static void on_response_trailers_received( - bidirectional_stream *, const bidirectional_stream_header_array *); -static void on_succeeded(bidirectional_stream *); -static void on_failed(bidirectional_stream *, int); -static void on_canceled(bidirectional_stream *); + bidirectional_stream*, const bidirectional_stream_header_array*); +static void on_succeeded(bidirectional_stream*); +static void on_failed(bidirectional_stream*, int); +static void on_canceled(bidirectional_stream*); static bidirectional_stream_callback cronet_callbacks = { on_stream_ready, on_response_headers_received, @@ -99,8 +99,8 @@ static bidirectional_stream_callback cronet_callbacks = { /* Cronet transport object */ struct grpc_cronet_transport { grpc_transport base; /* must be first element in this structure */ - stream_engine *engine; - char *host; + stream_engine* engine; + char* host; bool use_packet_coalescing; }; typedef struct grpc_cronet_transport grpc_cronet_transport; @@ -109,14 +109,14 @@ typedef struct grpc_cronet_transport grpc_cronet_transport; http://www.catb.org/esr/structure-packing/#_structure_reordering: */ struct read_state { /* vars to store data coming from server */ - char *read_buffer; + char* read_buffer; bool length_field_received; int received_bytes; int remaining_bytes; int length_field; bool compressed; char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; - char *payload_field; + char* payload_field; bool read_stream_closed; /* vars for holding data destined for the application */ @@ -132,7 +132,7 @@ struct read_state { }; struct write_state { - char *write_buffer; + char* write_buffer; }; /* track state of one stream op */ @@ -150,7 +150,7 @@ struct op_state { bool pending_recv_trailing_metadata; /* Cronet has not issued a callback of a bidirectional read */ bool pending_read_from_cronet; - grpc_error *cancel_error; + grpc_error* cancel_error; /* data structure for storing data coming from server */ struct read_state rs; /* data structure for storing data going to the server */ @@ -161,22 +161,22 @@ struct op_and_state { grpc_transport_stream_op_batch op; struct op_state state; bool done; - struct stream_obj *s; /* Pointer back to the stream object */ - struct op_and_state *next; /* next op_and_state in the linked list */ + struct stream_obj* s; /* Pointer back to the stream object */ + struct op_and_state* next; /* next op_and_state in the linked list */ }; struct op_storage { int num_pending_ops; - struct op_and_state *head; + struct op_and_state* head; }; struct stream_obj { - gpr_arena *arena; - struct op_and_state *oas; - grpc_transport_stream_op_batch *curr_op; - grpc_cronet_transport *curr_ct; - grpc_stream *curr_gs; - bidirectional_stream *cbs; + gpr_arena* arena; + struct op_and_state* oas; + grpc_transport_stream_op_batch* curr_op; + grpc_cronet_transport* curr_ct; + grpc_stream* curr_gs; + bidirectional_stream* cbs; bidirectional_stream_header_array header_array; /* Stream level state. Some state will be tracked both at stream and stream_op @@ -190,7 +190,7 @@ struct stream_obj { gpr_mu mu; /* Refcount object of the stream */ - grpc_stream_refcount *refcount; + grpc_stream_refcount* refcount; }; typedef struct stream_obj stream_obj; @@ -199,30 +199,30 @@ typedef struct stream_obj stream_obj; grpc_cronet_stream_ref((stream), (reason)) #define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ grpc_cronet_stream_unref((exec_ctx), (stream), (reason)) -void grpc_cronet_stream_ref(stream_obj *s, const char *reason) { +void grpc_cronet_stream_ref(stream_obj* s, const char* reason) { grpc_stream_ref(s->refcount, reason); } -void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s, - const char *reason) { +void grpc_cronet_stream_unref(grpc_exec_ctx* exec_ctx, stream_obj* s, + const char* reason) { grpc_stream_unref(exec_ctx, s->refcount, reason); } #else #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream)) #define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ grpc_cronet_stream_unref((exec_ctx), (stream)) -void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); } -void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) { +void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); } +void grpc_cronet_stream_unref(grpc_exec_ctx* exec_ctx, stream_obj* s) { grpc_stream_unref(exec_ctx, s->refcount); } #endif -static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, - struct op_and_state *oas); +static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx, + struct op_and_state* oas); /* Utility function to translate enum into string for printing */ -static const char *op_result_string(enum e_op_result i) { +static const char* op_result_string(enum e_op_result i) { switch (i) { case ACTION_TAKEN_WITH_CALLBACK: return "ACTION_TAKEN_WITH_CALLBACK"; @@ -234,7 +234,7 @@ static const char *op_result_string(enum e_op_result i) { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -static const char *op_id_string(enum e_op_id i) { +static const char* op_id_string(enum e_op_id i) { switch (i) { case OP_SEND_INITIAL_METADATA: return "OP_SEND_INITIAL_METADATA"; @@ -268,7 +268,7 @@ static const char *op_id_string(enum e_op_id i) { return "UNKNOWN"; } -static void null_and_maybe_free_read_buffer(stream_obj *s) { +static void null_and_maybe_free_read_buffer(stream_obj* s) { if (s->state.rs.read_buffer && s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { gpr_free(s->state.rs.read_buffer); @@ -276,7 +276,7 @@ static void null_and_maybe_free_read_buffer(stream_obj *s) { s->state.rs.read_buffer = NULL; } -static void maybe_flush_read(stream_obj *s) { +static void maybe_flush_read(stream_obj* s) { /* To enter flush read state (discarding all the buffered messages in * transport layer), two conditions must be satisfied: 1) non-zero grpc status * has been received, and 2) an op requesting the status code @@ -289,7 +289,7 @@ static void maybe_flush_read(stream_obj *s) { CRONET_LOG(GPR_DEBUG, "%p: Flush read", s); s->state.flush_read = true; null_and_maybe_free_read_buffer(s); - s->state.rs.read_buffer = (char *)gpr_malloc(GRPC_FLUSH_READ_SIZE); + s->state.rs.read_buffer = (char*)gpr_malloc(GRPC_FLUSH_READ_SIZE); if (!s->state.pending_read_from_cronet) { CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, @@ -300,8 +300,8 @@ static void maybe_flush_read(stream_obj *s) { } } -static grpc_error *make_error_with_desc(int error_code, const char *desc) { - grpc_error *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc); +static grpc_error* make_error_with_desc(int error_code, const char* desc) { + grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc); error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code); return error; } @@ -309,13 +309,13 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) { /* Add a new stream op to op storage. */ -static void add_to_storage(struct stream_obj *s, - grpc_transport_stream_op_batch *op) { - struct op_storage *storage = &s->storage; +static void add_to_storage(struct stream_obj* s, + grpc_transport_stream_op_batch* op) { + struct op_storage* storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ - struct op_and_state *new_op = - (struct op_and_state *)gpr_malloc(sizeof(struct op_and_state)); + struct op_and_state* new_op = + (struct op_and_state*)gpr_malloc(sizeof(struct op_and_state)); memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch)); memset(&new_op->state, 0, sizeof(new_op->state)); new_op->s = s; @@ -339,9 +339,9 @@ static void add_to_storage(struct stream_obj *s, /* Traverse the linked list and delete op and free memory */ -static void remove_from_storage(struct stream_obj *s, - struct op_and_state *oas) { - struct op_and_state *curr; +static void remove_from_storage(struct stream_obj* s, + struct op_and_state* oas) { + struct op_and_state* curr; if (s->storage.head == NULL || oas == NULL) { return; } @@ -373,9 +373,9 @@ static void remove_from_storage(struct stream_obj *s, This can get executed from the Cronet network thread via cronet callback or on the application supplied thread via the perform_stream_op function. */ -static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) { +static void execute_from_storage(grpc_exec_ctx* exec_ctx, stream_obj* s) { gpr_mu_lock(&s->mu); - for (struct op_and_state *curr = s->storage.head; curr != NULL;) { + for (struct op_and_state* curr = s->storage.head; curr != NULL;) { CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); GPR_ASSERT(curr->done == 0); enum e_op_result result = execute_stream_op(exec_ctx, curr); @@ -383,7 +383,7 @@ static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) { op_result_string(result)); /* if this op is done, then remove it and free memory */ if (curr->done) { - struct op_and_state *next = curr->next; + struct op_and_state* next = curr->next; remove_from_storage(s, curr); curr = next; } @@ -400,11 +400,11 @@ static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) { /* Cronet callback */ -static void on_failed(bidirectional_stream *stream, int net_error) { +static void on_failed(bidirectional_stream* stream, int net_error) { CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - stream_obj *s = (stream_obj *)stream->annotation; + stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_FAILED] = true; @@ -427,11 +427,11 @@ static void on_failed(bidirectional_stream *stream, int net_error) { /* Cronet callback */ -static void on_canceled(bidirectional_stream *stream) { +static void on_canceled(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - stream_obj *s = (stream_obj *)stream->annotation; + stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_CANCELED] = true; @@ -454,11 +454,11 @@ static void on_canceled(bidirectional_stream *stream) { /* Cronet callback */ -static void on_succeeded(bidirectional_stream *stream) { +static void on_succeeded(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - stream_obj *s = (stream_obj *)stream->annotation; + stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; @@ -473,11 +473,11 @@ static void on_succeeded(bidirectional_stream *stream) { /* Cronet callback */ -static void on_stream_ready(bidirectional_stream *stream) { +static void on_stream_ready(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - stream_obj *s = (stream_obj *)stream->annotation; - grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; + stream_obj* s = (stream_obj*)stream->annotation; + grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct; gpr_mu_lock(&s->mu); s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; @@ -503,13 +503,13 @@ static void on_stream_ready(bidirectional_stream *stream) { Cronet callback */ static void on_response_headers_received( - bidirectional_stream *stream, - const bidirectional_stream_header_array *headers, - const char *negotiated_protocol) { + bidirectional_stream* stream, + const bidirectional_stream_header_array* headers, + const char* negotiated_protocol) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); - stream_obj *s = (stream_obj *)stream->annotation; + stream_obj* s = (stream_obj*)stream->annotation; /* Identify if this is a header or a trailer (in a trailer-only response case) */ @@ -526,15 +526,15 @@ static void on_response_headers_received( grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, s->arena); for (size_t i = 0; i < headers->count; i++) { - GRPC_LOG_IF_ERROR( - "on_response_headers_received", - grpc_chttp2_incoming_metadata_buffer_add( - &exec_ctx, &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].value))))); + GRPC_LOG_IF_ERROR("on_response_headers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.initial_metadata, + grpc_mdelem_from_slices( + &exec_ctx, + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].value))))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -559,9 +559,9 @@ static void on_response_headers_received( /* Cronet callback */ -static void on_write_completed(bidirectional_stream *stream, const char *data) { +static void on_write_completed(bidirectional_stream* stream, const char* data) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - stream_obj *s = (stream_obj *)stream->annotation; + stream_obj* s = (stream_obj*)stream->annotation; CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); gpr_mu_lock(&s->mu); if (s->state.ws.write_buffer) { @@ -577,10 +577,10 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) { /* Cronet callback */ -static void on_read_completed(bidirectional_stream *stream, char *data, +static void on_read_completed(bidirectional_stream* stream, char* data, int count) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - stream_obj *s = (stream_obj *)stream->annotation; + stream_obj* s = (stream_obj*)stream->annotation; CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); gpr_mu_lock(&s->mu); @@ -620,13 +620,13 @@ static void on_read_completed(bidirectional_stream *stream, char *data, Cronet callback */ static void on_response_trailers_received( - bidirectional_stream *stream, - const bidirectional_stream_header_array *trailers) { + bidirectional_stream* stream, + const bidirectional_stream_header_array* trailers) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); - stream_obj *s = (stream_obj *)stream->annotation; - grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; + stream_obj* s = (stream_obj*)stream->annotation; + grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct; gpr_mu_lock(&s->mu); memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); @@ -636,15 +636,15 @@ static void on_response_trailers_received( for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); - GRPC_LOG_IF_ERROR( - "on_response_trailers_received", - grpc_chttp2_incoming_metadata_buffer_add( - &exec_ctx, &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].value))))); + GRPC_LOG_IF_ERROR("on_response_trailers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.trailing_metadata, + grpc_mdelem_from_slices( + &exec_ctx, + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].value))))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { @@ -679,17 +679,17 @@ static void on_response_trailers_received( Utility function that takes the data from s->write_slice_buffer and assembles into a contiguous byte stream with 5 byte gRPC header prepended. */ -static void create_grpc_frame(grpc_exec_ctx *exec_ctx, - grpc_slice_buffer *write_slice_buffer, - char **pp_write_buffer, - size_t *p_write_buffer_size, uint32_t flags) { +static void create_grpc_frame(grpc_exec_ctx* exec_ctx, + grpc_slice_buffer* write_slice_buffer, + char** pp_write_buffer, + size_t* p_write_buffer_size, uint32_t flags) { grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); size_t length = GRPC_SLICE_LENGTH(slice); *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; /* This is freed in the on_write_completed callback */ - char *write_buffer = (char *)gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES); + char* write_buffer = (char*)gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES); *pp_write_buffer = write_buffer; - uint8_t *p = (uint8_t *)write_buffer; + uint8_t* p = (uint8_t*)write_buffer; /* Append 5 byte header */ /* Compressed flag */ *p++ = (uint8_t)((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0); @@ -707,10 +707,10 @@ static void create_grpc_frame(grpc_exec_ctx *exec_ctx, Convert metadata in a format that Cronet can consume */ static void convert_metadata_to_cronet_headers( - grpc_linked_mdelem *head, const char *host, char **pp_url, - bidirectional_stream_header **pp_headers, size_t *p_num_headers, - const char **method) { - grpc_linked_mdelem *curr = head; + grpc_linked_mdelem* head, const char* host, char** pp_url, + bidirectional_stream_header** pp_headers, size_t* p_num_headers, + const char** method) { + grpc_linked_mdelem* curr = head; /* Walk the linked list and get number of header fields */ size_t num_headers_available = 0; while (curr != NULL) { @@ -719,8 +719,8 @@ static void convert_metadata_to_cronet_headers( } /* Allocate enough memory. It is freed in the on_stream_ready callback */ - bidirectional_stream_header *headers = - (bidirectional_stream_header *)gpr_malloc( + bidirectional_stream_header* headers = + (bidirectional_stream_header*)gpr_malloc( sizeof(bidirectional_stream_header) * num_headers_available); *pp_headers = headers; @@ -734,8 +734,8 @@ static void convert_metadata_to_cronet_headers( while (num_headers < num_headers_available) { grpc_mdelem mdelem = curr->md; curr = curr->next; - char *key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem)); - char *value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem)); + char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem)); + char* value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem)); if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) || grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_AUTHORITY)) { /* Cronet populates these fields on its own */ @@ -772,10 +772,10 @@ static void convert_metadata_to_cronet_headers( *p_num_headers = (size_t)num_headers; } -static void parse_grpc_header(const uint8_t *data, int *length, - bool *compressed) { +static void parse_grpc_header(const uint8_t* data, int* length, + bool* compressed) { const uint8_t c = *data; - const uint8_t *p = data + 1; + const uint8_t* p = data + 1; *compressed = ((c & 0x01) == 0x01); *length = 0; *length |= ((uint8_t)*p++) << 24; @@ -784,7 +784,7 @@ static void parse_grpc_header(const uint8_t *data, int *length, *length |= ((uint8_t)*p++); } -static bool header_has_authority(grpc_linked_mdelem *head) { +static bool header_has_authority(grpc_linked_mdelem* head) { while (head != NULL) { if (grpc_slice_eq(GRPC_MDKEY(head->md), GRPC_MDSTR_AUTHORITY)) { return true; @@ -798,11 +798,11 @@ static bool header_has_authority(grpc_linked_mdelem *head) { Op Execution: Decide if one of the actions contained in the stream op can be executed. This is the heart of the state machine. */ -static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, - struct stream_obj *s, struct op_state *op_state, +static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op, + struct stream_obj* s, struct op_state* op_state, enum e_op_id op_id) { - struct op_state *stream_state = &s->state; - grpc_cronet_transport *t = s->curr_ct; + struct op_state* stream_state = &s->state; + grpc_cronet_transport* t = s->curr_ct; bool result = true; /* When call is canceled, every op can be run, except under following conditions @@ -981,12 +981,12 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, /* TODO (makdharma): Break down this function in smaller chunks for readability. */ -static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, - struct op_and_state *oas) { - grpc_transport_stream_op_batch *stream_op = &oas->op; - struct stream_obj *s = oas->s; - grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; - struct op_state *stream_state = &s->state; +static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx, + struct op_and_state* oas) { + grpc_transport_stream_op_batch* stream_op = &oas->op; + struct stream_obj* s = oas->s; + grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct; + struct op_state* stream_state = &s->state; enum e_op_result result = NO_ACTION_POSSIBLE; if (stream_op->send_initial_metadata && op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) { @@ -1002,8 +1002,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, bidirectional_stream_disable_auto_flush(s->cbs, true); bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); } - char *url = NULL; - const char *method = "POST"; + char* url = NULL; + const char* method = "POST"; s->header_array.headers = NULL; convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata .send_initial_metadata->list.head, @@ -1018,8 +1018,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, unsigned int header_index; for (header_index = 0; header_index < s->header_array.count; header_index++) { - gpr_free((void *)s->header_array.headers[header_index].key); - gpr_free((void *)s->header_array.headers[header_index].value); + gpr_free((void*)s->header_array.headers[header_index].key); + gpr_free((void*)s->header_array.headers[header_index].value); } stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; if (t->use_packet_coalescing) { @@ -1177,14 +1177,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.remaining_bytes == 0) { /* Start a read operation for data */ stream_state->rs.length_field_received = true; - parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer, + parse_grpc_header((const uint8_t*)stream_state->rs.read_buffer, &stream_state->rs.length_field, &stream_state->rs.compressed); CRONET_LOG(GPR_DEBUG, "length field = %d", stream_state->rs.length_field); if (stream_state->rs.length_field > 0) { stream_state->rs.read_buffer = - (char *)gpr_malloc((size_t)stream_state->rs.length_field); + (char*)gpr_malloc((size_t)stream_state->rs.length_field); GPR_ASSERT(stream_state->rs.read_buffer); stream_state->rs.remaining_bytes = stream_state->rs.length_field; stream_state->rs.received_bytes = 0; @@ -1207,9 +1207,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (stream_state->rs.compressed) { stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - *((grpc_byte_buffer **) - stream_op->payload->recv_message.recv_message) = - (grpc_byte_buffer *)&stream_state->rs.sbs; + *((grpc_byte_buffer**)stream_op->payload->recv_message.recv_message) = + (grpc_byte_buffer*)&stream_state->rs.sbs; GRPC_CLOSURE_SCHED( exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); @@ -1250,7 +1249,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "read operation complete"); grpc_slice read_data_slice = GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field); - uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); + uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); null_and_maybe_free_read_buffer(s); @@ -1265,8 +1264,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (stream_state->rs.compressed) { stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS; } - *((grpc_byte_buffer **)stream_op->payload->recv_message.recv_message) = - (grpc_byte_buffer *)&stream_state->rs.sbs; + *((grpc_byte_buffer**)stream_op->payload->recv_message.recv_message) = + (grpc_byte_buffer*)&stream_state->rs.sbs; GRPC_CLOSURE_SCHED(exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); @@ -1351,10 +1350,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, Functions used by upper layers to access transport functionality. */ -static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data, gpr_arena *arena) { - stream_obj *s = (stream_obj *)gs; +static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) { + stream_obj* s = (stream_obj*)gs; s->refcount = refcount; GRPC_CRONET_STREAM_REF(s, "cronet transport"); @@ -1377,23 +1376,23 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->state.pending_read_from_cronet = false; s->curr_gs = gs; - s->curr_ct = (grpc_cronet_transport *)gt; + s->curr_ct = (grpc_cronet_transport*)gt; s->arena = arena; gpr_mu_init(&s->mu); return 0; } -static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) {} +static void set_pollset_do_nothing(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_pollset* pollset) {} -static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, - grpc_transport *gt, grpc_stream *gs, - grpc_pollset_set *pollset_set) {} +static void set_pollset_set_do_nothing(grpc_exec_ctx* exec_ctx, + grpc_transport* gt, grpc_stream* gs, + grpc_pollset_set* pollset_set) {} -static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, - grpc_transport_stream_op_batch *op) { +static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, + grpc_transport_stream_op_batch* op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); if (op->send_initial_metadata && header_has_authority(op->payload->send_initial_metadata @@ -1413,15 +1412,15 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); return; } - stream_obj *s = (stream_obj *)gs; + stream_obj* s = (stream_obj*)gs; add_to_storage(s, op); execute_from_storage(exec_ctx, s); } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, - grpc_closure *then_schedule_closure) { - stream_obj *s = (stream_obj *)gs; +static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, + grpc_closure* then_schedule_closure) { + stream_obj* s = (stream_obj*)gs; null_and_maybe_free_read_buffer(s); /* Clean up read_slice_buffer in case there is unread data. */ grpc_slice_buffer_destroy_internal(exec_ctx, &s->state.rs.read_slice_buffer); @@ -1429,15 +1428,15 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } -static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} +static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) {} -static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, - grpc_transport *gt) { +static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx, + grpc_transport* gt) { return NULL; } -static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) {} +static void perform_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_transport_op* op) {} static const grpc_transport_vtable grpc_cronet_vtable = { sizeof(stream_obj), @@ -1451,17 +1450,17 @@ static const grpc_transport_vtable grpc_cronet_vtable = { destroy_transport, get_endpoint}; -grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, - const grpc_channel_args *args, - void *reserved) { - grpc_cronet_transport *ct = - (grpc_cronet_transport *)gpr_malloc(sizeof(grpc_cronet_transport)); +grpc_transport* grpc_create_cronet_transport(void* engine, const char* target, + const grpc_channel_args* args, + void* reserved) { + grpc_cronet_transport* ct = + (grpc_cronet_transport*)gpr_malloc(sizeof(grpc_cronet_transport)); if (!ct) { goto error; } ct->base.vtable = &grpc_cronet_vtable; - ct->engine = (stream_engine *)engine; - ct->host = (char *)gpr_malloc(strlen(target) + 1); + ct->engine = (stream_engine*)engine; + ct->host = (char*)gpr_malloc(strlen(target) + 1); if (!ct->host) { goto error; } |