From be5186a79dee5ca96254b6d38fa2401522987f63 Mon Sep 17 00:00:00 2001 From: Makarand Dharmapurikar Date: Wed, 27 Apr 2016 13:47:10 -0700 Subject: created new grpc_cronet.h. Addressed feedback from jcanizales@ and ctiller@ --- .../cronet/client/secure/cronet_channel_create.c | 22 +- .../transport/cronet/transport/cronet_transport.c | 315 ++++++++++++--------- 2 files changed, 194 insertions(+), 143 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c index 914c567086..96baa3984b 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -32,6 +32,9 @@ */ #include + +#ifdef GRPC_COMPILE_WITH_CRONET + #include #include @@ -41,23 +44,20 @@ #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" -#ifdef COMPILE_WITH_CRONET // Cronet transport object -struct grpc_cronet_transport { - grpc_transport base; /* must be first element in this structure */ +typedef struct cronet_transport { + grpc_transport base; // must be first element in this structure void *engine; char *host; -}; - -typedef struct grpc_cronet_transport grpc_cronet_transport; +} cronet_transport; -extern grpc_transport_vtable cronet_vtable; +extern grpc_transport_vtable grpc_cronet_vtable; -GRPCAPI grpc_channel *grpc_custom_secure_channel_create( +GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( void *engine, const char *target, const grpc_channel_args *args, void *reserved) { - grpc_cronet_transport *ct = gpr_malloc(sizeof(grpc_cronet_transport)); - ct->base.vtable = &cronet_vtable; + cronet_transport *ct = gpr_malloc(sizeof(cronet_transport)); + ct->base.vtable = &grpc_cronet_vtable; ct->engine = engine; ct->host = gpr_malloc(strlen(target) + 1); strcpy(ct->host, target); @@ -69,4 +69,4 @@ GRPCAPI grpc_channel *grpc_custom_secure_channel_create( return grpc_channel_create(&exec_ctx, target, args, GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); } -#endif // COMPILE_WITH_CRONET +#endif // GRPC_COMPILE_WITH_CRONET diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 5c27a4ea00..c2bb9e0393 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -49,25 +49,9 @@ #include "src/core/lib/transport/transport_impl.h" #include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" -#ifdef COMPILE_WITH_CRONET +#ifdef GRPC_COMPILE_WITH_CRONET #define GRPC_HEADER_SIZE_IN_BYTES 5 -#define MAX_HDRS 100 - -#define GRPC_CRONET_TRACE(...) \ - { \ - if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \ - } -#define CRONET_READ(...) \ - { \ - GRPC_CRONET_TRACE(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); \ - cronet_bidirectional_stream_read(__VA_ARGS__); \ - } -#define SET_RECV_STATE(STATE) \ - { \ - GRPC_CRONET_TRACE(GPR_DEBUG, "next_state = %s", recv_state_name[STATE]); \ - cronet_recv_state = STATE; \ - } // Global flag that gets set with GRPC_TRACE env variable int grpc_cronet_trace = 1; @@ -76,7 +60,7 @@ int grpc_cronet_trace = 1; struct grpc_cronet_transport { grpc_transport base; /* must be first element in this structure */ cronet_engine *engine; - const char *host; + char *host; }; typedef struct grpc_cronet_transport grpc_cronet_transport; @@ -96,7 +80,8 @@ enum recv_state { CRONET_RECV_CLOSED, }; -const char *recv_state_name[] = {"CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", +static const char *recv_state_name[] = {"CRONET_RECV_IDLE", + "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,", "CRONET_RECV_CLOSED"}; @@ -120,21 +105,20 @@ enum callback_id { struct stream_obj { // we store received bytes here as they trickle in. - gpr_slice_buffer write_slicebuffer; + gpr_slice_buffer write_slice_buffer; cronet_bidirectional_stream *cbs; gpr_slice slice; - gpr_slice_buffer read_slicebuffer; + gpr_slice_buffer read_slice_buffer; struct grpc_slice_buffer_stream sbs; char *read_buffer; - uint32_t remaining_read_bytes; - uint32_t total_read_bytes; + int remaining_read_bytes; + int total_read_bytes; char *write_buffer; size_t write_buffer_size; - // + // Hold the URL char *url; - char *host; bool response_headers_received; bool read_requested; @@ -155,57 +139,65 @@ struct stream_obj { grpc_closure *callback_list[CB_NUM_CALLBACKS][2]; // storage for header - cronet_bidirectional_stream_header headers[MAX_HDRS]; + cronet_bidirectional_stream_header *headers; uint32_t num_headers; cronet_bidirectional_stream_header_array header_array; + // state tracking + enum recv_state cronet_recv_state; + enum send_state cronet_send_state; }; typedef struct stream_obj stream_obj; -void next_send_step(stream_obj *s); -void next_recv_step(stream_obj *s, enum e_caller caller); +static void next_send_step(stream_obj *s); +static void next_recv_step(stream_obj *s, enum e_caller caller); -enum send_state cronet_send_state; -enum recv_state cronet_recv_state; static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset *pollset) {} -void enqueue_callbacks(grpc_closure *callback_list[]) { +static void enqueue_callbacks(grpc_closure *callback_list[]) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (callback_list[0]) { - // GRPC_CRONET_TRACE(GPR_DEBUG, "enqueuing callback = %p", - // callback_list[0]); grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL); callback_list[0] = NULL; } if (callback_list[1]) { - // GRPC_CRONET_TRACE(GPR_DEBUG, "enqueuing callback = %p", - // callback_list[1]); grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL); callback_list[1] = NULL; } grpc_exec_ctx_finish(&exec_ctx); } -void on_canceled(cronet_bidirectional_stream *stream) { - GRPC_CRONET_TRACE(GPR_DEBUG, "on_canceled %p", stream); +static void on_canceled(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_canceled %p", stream); + } } -void on_failed(cronet_bidirectional_stream *stream, int net_error) { - GRPC_CRONET_TRACE(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error); + +static void on_failed(cronet_bidirectional_stream *stream, int net_error) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error); + } } -void on_succeded(cronet_bidirectional_stream *stream) { - GRPC_CRONET_TRACE(GPR_DEBUG, "on_succeeded %p", stream); + +static void on_succeeded(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_succeeded %p", stream); + } } -void on_response_trailers_received( + +static void on_response_trailers_received( cronet_bidirectional_stream *stream, const cronet_bidirectional_stream_header_array *trailers) { - GRPC_CRONET_TRACE(GPR_DEBUG, "R: on_response_trailers_received"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_response_trailers_received"); + } stream_obj *s = (stream_obj *)stream->annotation; memset(&s->imb, 0, sizeof(s->imb)); grpc_chttp2_incoming_metadata_buffer_init(&s->imb); - int i = 0; + unsigned int i = 0; for (i = 0; i < trailers->count; i++) { grpc_chttp2_incoming_metadata_buffer_add( &s->imb, grpc_mdelem_from_metadata_strings( @@ -215,26 +207,29 @@ void on_response_trailers_received( s->response_trailers_received = true; next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED); } -void on_write_completed(cronet_bidirectional_stream *stream, const char *data) { - GRPC_CRONET_TRACE(GPR_DEBUG, "W: on_write_completed"); + +static void on_write_completed(cronet_bidirectional_stream *stream, const char *data) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: on_write_completed"); + } stream_obj *s = (stream_obj *)stream->annotation; enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]); - cronet_send_state = CRONET_WRITE_COMPLETED; + s->cronet_send_state = CRONET_WRITE_COMPLETED; next_send_step(s); } -void process_recv_message(stream_obj *s, const uint8_t *recv_data) { - gpr_slice read_data_slice = gpr_slice_malloc(s->total_read_bytes); +static void process_recv_message(stream_obj *s, const uint8_t *recv_data) { + gpr_slice read_data_slice = gpr_slice_malloc((uint32_t) s->total_read_bytes); uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); memcpy(dst_p, recv_data, s->total_read_bytes); - gpr_slice_buffer_add(&s->read_slicebuffer, read_data_slice); - grpc_slice_buffer_stream_init(&s->sbs, &s->read_slicebuffer, 0); + gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); + grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0); *s->recv_message = (grpc_byte_buffer *)&s->sbs; } -int parse_grpc_header(const uint8_t *data) { +static int parse_grpc_header(const uint8_t *data) { const uint8_t *p = data + 1; - uint32_t length = 0; + int length = 0; length |= ((uint8_t)*p++) << 24; length |= ((uint8_t)*p++) << 16; length |= ((uint8_t)*p++) << 8; @@ -242,12 +237,13 @@ int parse_grpc_header(const uint8_t *data) { return length; } -void on_read_completed(cronet_bidirectional_stream *stream, char *data, +static void on_read_completed(cronet_bidirectional_stream *stream, char *data, int count) { stream_obj *s = (stream_obj *)stream->annotation; - GRPC_CRONET_TRACE(GPR_DEBUG, - "R: on_read_completed count=%d, total=%d, remaining=%d", + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d", count, s->total_read_bytes, s->remaining_read_bytes); + } if (count > 0) { GPR_ASSERT(s->recv_message); s->remaining_read_bytes -= count; @@ -258,36 +254,40 @@ void on_read_completed(cronet_bidirectional_stream *stream, char *data, } } -void on_response_headers_received( +static void on_response_headers_received( cronet_bidirectional_stream *stream, const cronet_bidirectional_stream_header_array *headers, const char *negotiated_protocol) { - GRPC_CRONET_TRACE(GPR_DEBUG, "R: on_response_headers_received"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_response_headers_received"); + } stream_obj *s = (stream_obj *)stream->annotation; enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]); s->response_headers_received = true; next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED); } -void on_request_headers_sent(cronet_bidirectional_stream *stream) { - GRPC_CRONET_TRACE(GPR_DEBUG, "W: on_request_headers_sent"); +static void on_request_headers_sent(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: on_request_headers_sent"); + } stream_obj *s = (stream_obj *)stream->annotation; enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]); - cronet_send_state = CRONET_SEND_HEADER; + s->cronet_send_state = CRONET_SEND_HEADER; next_send_step(s); } // Callback function pointers (invoked by cronet in response to events) -cronet_bidirectional_stream_callback callbacks = {on_request_headers_sent, +static cronet_bidirectional_stream_callback callbacks = {on_request_headers_sent, on_response_headers_received, on_read_completed, on_write_completed, on_response_trailers_received, - on_succeded, + on_succeeded, on_failed, on_canceled}; -void invoke_closing_callback(stream_obj *s) { +static void invoke_closing_callback(stream_obj *s) { grpc_chttp2_incoming_metadata_buffer_publish(&s->imb, s->recv_trailing_metadata); if (s->callback_list[CB_RECV_TRAILING_METADATA]) { @@ -295,59 +295,75 @@ void invoke_closing_callback(stream_obj *s) { } } +static void set_recv_state(stream_obj *s, enum recv_state state) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]); + } + s->cronet_recv_state = state; +} + + // This is invoked from perform_stream_op, and all on_xxxx callbacks. -void next_recv_step(stream_obj *s, enum e_caller caller) { +static void next_recv_step(stream_obj *s, enum e_caller caller) { gpr_mu_lock(&s->recv_mu); - switch (cronet_recv_state) { + switch (s->cronet_recv_state) { case CRONET_RECV_IDLE: - GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE"); + } if (caller == PERFORM_STREAM_OP || caller == ON_RESPONSE_HEADERS_RECEIVED) { if (s->read_closed && s->response_trailers_received) { invoke_closing_callback(s); - SET_RECV_STATE(CRONET_RECV_CLOSED); + set_recv_state(s, CRONET_RECV_CLOSED); } else if (s->response_headers_received == true && s->read_requested == true) { - SET_RECV_STATE(CRONET_RECV_READ_LENGTH); + set_recv_state(s, CRONET_RECV_READ_LENGTH); s->total_read_bytes = s->remaining_read_bytes = GRPC_HEADER_SIZE_IN_BYTES; GPR_ASSERT(s->read_buffer); - CRONET_READ(s->cbs, s->read_buffer, s->remaining_read_bytes); + if (grpc_cronet_trace) {gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");} + cronet_bidirectional_stream_read(s->cbs, s->read_buffer, s->remaining_read_bytes); } } break; case CRONET_RECV_READ_LENGTH: - GRPC_CRONET_TRACE(GPR_DEBUG, - "cronet_recv_state = CRONET_RECV_READ_LENGTH"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH"); + } if (caller == ON_READ_COMPLETE) { if (s->read_closed) { invoke_closing_callback(s); enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); - SET_RECV_STATE(CRONET_RECV_CLOSED); + set_recv_state(s, CRONET_RECV_CLOSED); } else { GPR_ASSERT(s->remaining_read_bytes == 0); - SET_RECV_STATE(CRONET_RECV_READ_DATA); + set_recv_state(s, CRONET_RECV_READ_DATA); s->total_read_bytes = s->remaining_read_bytes = - parse_grpc_header(s->read_buffer); - s->read_buffer = gpr_realloc(s->read_buffer, s->remaining_read_bytes); + parse_grpc_header((const uint8_t *)s->read_buffer); + s->read_buffer = gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes); GPR_ASSERT(s->read_buffer); - CRONET_READ(s->cbs, (char *)s->read_buffer, s->remaining_read_bytes); + if (grpc_cronet_trace) {gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");} + cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, s->remaining_read_bytes); } } break; case CRONET_RECV_READ_DATA: - GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA"); + } if (caller == ON_READ_COMPLETE) { if (s->remaining_read_bytes > 0) { int offset = s->total_read_bytes - s->remaining_read_bytes; GPR_ASSERT(s->read_buffer); - CRONET_READ(s->cbs, (char *)s->read_buffer + offset, + if (grpc_cronet_trace) {gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");} + cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes); } else { - gpr_slice_buffer_init(&s->read_slicebuffer); - uint8_t *p = s->read_buffer; + gpr_slice_buffer_init(&s->read_slice_buffer); + uint8_t *p = (uint8_t *)s->read_buffer; process_recv_message(s, p); - SET_RECV_STATE(CRONET_RECV_IDLE); + set_recv_state(s, CRONET_RECV_IDLE); enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); } } @@ -361,15 +377,15 @@ void next_recv_step(stream_obj *s, enum e_caller caller) { gpr_mu_unlock(&s->recv_mu); } -// This function takes the data from s->write_slicebuffer and assembles into +// This function takes the data from s->write_slice_buffer and assembles into // a contiguous byte stream with 5 byte gRPC header prepended. -void create_grpc_frame(stream_obj *s) { - gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slicebuffer); +static void create_grpc_frame(stream_obj *s) { + gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer); uint8_t *raw_data = GPR_SLICE_START_PTR(slice); size_t length = GPR_SLICE_LENGTH(slice); s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size); - uint8_t *p = s->write_buffer; + uint8_t *p = (uint8_t *)s->write_buffer; // Append 5 byte header *p++ = 0; *p++ = (uint8_t)(length >> 24); @@ -380,32 +396,37 @@ void create_grpc_frame(stream_obj *s) { memcpy(p, raw_data, length); } -void do_write(stream_obj *s) { - gpr_slice_buffer *sb = &s->write_slicebuffer; +static void do_write(stream_obj *s) { + gpr_slice_buffer *sb = &s->write_slice_buffer; GPR_ASSERT(sb->count <= 1); if (sb->count > 0) { create_grpc_frame(s); - GRPC_CRONET_TRACE(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + } cronet_bidirectional_stream_write(s->cbs, s->write_buffer, (int)s->write_buffer_size, false); } } // -void next_send_step(stream_obj *s) { - switch (cronet_send_state) { +static void next_send_step(stream_obj *s) { + switch (s->cronet_send_state) { case CRONET_SEND_IDLE: GPR_ASSERT( s->cbs); // cronet_bidirectional_stream is not initialized yet. - cronet_send_state = CRONET_REQ_STARTED; - GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", - s->url); + s->cronet_send_state = CRONET_REQ_STARTED; + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url); + } cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST", &s->header_array, false); + // we no longer need the memory that was allocated earlier. + gpr_free(s->header_array.headers); break; case CRONET_SEND_HEADER: do_write(s); - cronet_send_state = CRONET_WRITE; + s->cronet_send_state = CRONET_WRITE; break; case CRONET_WRITE_COMPLETED: do_write(s); @@ -416,19 +437,25 @@ void next_send_step(stream_obj *s) { } } -void create_url(const char *path, const char *host, stream_obj *s) { - const char prefix[] = "https://"; - s->url = gpr_malloc(strlen(prefix) + strlen(host) + strlen(path) + 1); - strcpy(s->url, prefix); - strcat(s->url, host); - strcat(s->url, path); -} - static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, const char *host, stream_obj *s) { grpc_linked_mdelem *curr = head; - while (s->num_headers < MAX_HDRS) { + // Walk the linked list and get number of header fields + uint32_t num_headers_available = 0; + while (curr != NULL) { + curr = curr->next; + num_headers_available++; + } + // Allocate enough memory + s->headers = (cronet_bidirectional_stream_header *) + gpr_malloc(sizeof(cronet_bidirectional_stream_header) * num_headers_available); + + // Walk the linked list again, this time copying the header fields. s->num_headers + // can be less than num_headers_available, as some headers are not used for cronet + curr = head; + s->num_headers = 0; + while (s->num_headers < num_headers_available) { grpc_mdelem *mdelem = curr->md; curr = curr->next; const char *key = grpc_mdstr_as_c_string(mdelem->key); @@ -440,8 +467,10 @@ static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, } if (strcmp(key, ":path") == 0) { // Create URL by appending :path value to the hostname - create_url(value, host, s); - GRPC_CRONET_TRACE(GPR_DEBUG, "extracted URL = %s", s->url); + gpr_asprintf(&s->url, "https://%s%s", host, value); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "extracted URL = %s", s->url); + } continue; } s->headers[s->num_headers].key = key; @@ -459,18 +488,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(ct->engine); stream_obj *s = (stream_obj *)gs; if (op->recv_trailing_metadata) { - GRPC_CRONET_TRACE( + if (grpc_cronet_trace) { + gpr_log( GPR_DEBUG, "perform_stream_op - recv_trailing_metadata: on_complete=%p", op->on_complete); + } s->recv_trailing_metadata = op->recv_trailing_metadata; GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]); s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete; } if (op->recv_message) { - GRPC_CRONET_TRACE(GPR_DEBUG, - "perform_stream_op - recv_message: on_complete=%p", - op->on_complete); - s->recv_message = op->recv_message; + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p", + op->on_complete); + } + s->recv_message = (grpc_byte_buffer **)op->recv_message; GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]); GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]); s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready; @@ -479,9 +511,10 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, next_recv_step(s, PERFORM_STREAM_OP); } if (op->recv_initial_metadata) { - GRPC_CRONET_TRACE(GPR_DEBUG, - "perform_stream_op - recv_initial_metadata:=%p", - op->on_complete); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p", + op->on_complete); + } s->recv_initial_metadata = op->recv_initial_metadata; GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]); GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]); @@ -490,9 +523,11 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete; } if (op->send_initial_metadata) { - GRPC_CRONET_TRACE( + if (grpc_cronet_trace) { + gpr_log( GPR_DEBUG, "perform_stream_op - send_initial_metadata: on_complete=%p", op->on_complete); + } s->num_headers = 0; convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head, ct->host, s); @@ -503,36 +538,45 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete; } if (op->send_message) { - GRPC_CRONET_TRACE(GPR_DEBUG, - "perform_stream_op - send_message: on_complete=%p", - op->on_complete); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p", + op->on_complete); + } grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice, op->send_message->length, NULL); - gpr_slice_buffer_add(&s->write_slicebuffer, s->slice); + // Check that compression flag is not ON. We don't support compression yet. + // TODO (makdharma): add compression support + GPR_ASSERT(op->send_message->flags == 0); + gpr_slice_buffer_add(&s->write_slice_buffer, s->slice); if (s->cbs == NULL) { - GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_bidirectional_stream_create"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create"); + } s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks); GPR_ASSERT(s->cbs); s->read_closed = false; s->response_trailers_received = false; s->response_headers_received = false; - cronet_send_state = CRONET_SEND_IDLE; - cronet_recv_state = CRONET_RECV_IDLE; + s->cronet_send_state = CRONET_SEND_IDLE; + s->cronet_recv_state = CRONET_RECV_IDLE; } GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]); s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete; next_send_step(s); } if (op->send_trailing_metadata) { - GRPC_CRONET_TRACE( - GPR_DEBUG, "perform_stream_op - send_trailing_metadata: on_complete=%p", + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - send_trailing_metadata: on_complete=%p", op->on_complete); + } GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]); s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete; if (s->cbs) { // Send an "empty" write to the far end to signal that we're done. // This will induce the server to send down trailers. - GRPC_CRONET_TRACE(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + } cronet_bidirectional_stream_write(s->cbs, "abc", 0, true); } else { // We never created a stream. This was probably an empty request. @@ -550,34 +594,41 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_mu_init(&s->recv_mu); s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); - gpr_slice_buffer_init(&s->write_slicebuffer); - GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_transport - init_stream"); + gpr_slice_buffer_init(&s->write_slice_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_transport - init_stream"); + } return 0; } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs) { - GRPC_CRONET_TRACE(GPR_DEBUG, "Destroy stream"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Destroy stream"); + } stream_obj *s = (stream_obj *)gs; s->cbs = NULL; gpr_free(s->read_buffer); gpr_free(s->write_buffer); + gpr_free(s->url); gpr_mu_destroy(&s->recv_mu); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; gpr_free(ct->host); - GRPC_CRONET_TRACE(GPR_DEBUG, "Destroy transport"); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Destroy transport"); + } } -const grpc_transport_vtable cronet_vtable = {sizeof(stream_obj), +const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), "cronet_http", init_stream, set_pollset_do_nothing, perform_stream_op, + NULL, destroy_stream, destroy_transport, - NULL, NULL}; -#endif // COMPILE_WITH_CRONET +#endif // GRPC_COMPILE_WITH_CRONET -- cgit v1.2.3