diff options
Diffstat (limited to 'src/core/ext')
8 files changed, 838 insertions, 15 deletions
diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c index 07f29bcb27..e7a4abd568 100644 --- a/src/core/ext/client_config/resolver_registry.c +++ b/src/core/ext/client_config/resolver_registry.c @@ -47,7 +47,6 @@ static int g_number_of_resolvers = 0; static char *g_default_resolver_prefix; void grpc_resolver_registry_init(const char *default_resolver_prefix) { - g_number_of_resolvers = 0; g_default_resolver_prefix = gpr_strdup(default_resolver_prefix); } @@ -57,6 +56,13 @@ void grpc_resolver_registry_shutdown(void) { grpc_resolver_factory_unref(g_all_of_the_resolvers[i]); } gpr_free(g_default_resolver_prefix); + // FIXME(ctiller): this should live in grpc_resolver_registry_init, + // however that would have the client_config plugin call this AFTER we start + // registering resolvers from third party plugins, and so they'd never show + // up. + // We likely need some kind of dependency system for plugins.... what form + // that takes is TBD. + g_number_of_resolvers = 0; } void grpc_register_resolver_type(grpc_resolver_factory *factory) { diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 3a5af9f53d..bd45d3825c 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -268,7 +268,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); - gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef); + gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); } gpr_mu_unlock(&c->mu); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 9ca551f0c5..53633227ac 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -629,9 +629,10 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, check_read_ops(exec_ctx, &t->global); gpr_mu_lock(&t->executor.mu); - if (t->executor.pending_actions != NULL) { - hdr = t->executor.pending_actions; - t->executor.pending_actions = NULL; + if (t->executor.pending_actions_head != NULL) { + hdr = t->executor.pending_actions_head; + t->executor.pending_actions_head = t->executor.pending_actions_tail = + NULL; gpr_mu_unlock(&t->executor.mu); while (hdr != NULL) { hdr->action(exec_ctx, t, hdr->stream, hdr->arg); @@ -686,8 +687,14 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, gpr_free(hdr); continue; } - hdr->next = t->executor.pending_actions; - t->executor.pending_actions = hdr; + hdr->next = NULL; + if (t->executor.pending_actions_head != NULL) { + t->executor.pending_actions_tail = + t->executor.pending_actions_tail->next = hdr; + } else { + t->executor.pending_actions_tail = t->executor.pending_actions_head = + hdr; + } REF_TRANSPORT(t, "pending_action"); gpr_mu_unlock(&t->executor.mu); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 7a8084641d..8c4fa2d34a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -236,9 +236,6 @@ struct grpc_chttp2_transport_parsing { /** was a goaway frame received? */ uint8_t goaway_received; - /** the last sent max_table_size setting */ - uint32_t last_sent_max_table_size; - /** initial window change */ int64_t initial_window_update; @@ -272,6 +269,9 @@ struct grpc_chttp2_transport_parsing { uint32_t incoming_frame_size; uint32_t incoming_stream_id; + /* current max frame size */ + uint32_t max_frame_size; + /* active parser */ void *parser_data; grpc_chttp2_stream_parsing *incoming_stream; @@ -282,6 +282,8 @@ struct grpc_chttp2_transport_parsing { /* received settings */ uint32_t settings[GRPC_CHTTP2_NUM_SETTINGS]; + /* last settings that were sent */ + uint32_t last_sent_settings[GRPC_CHTTP2_NUM_SETTINGS]; /* goaway data */ grpc_status_code goaway_error; @@ -321,7 +323,8 @@ struct grpc_chttp2_transport { /** is a thread currently parsing */ bool parsing_active; - grpc_chttp2_executor_action_header *pending_actions; + grpc_chttp2_executor_action_header *pending_actions_head; + grpc_chttp2_executor_action_header *pending_actions_tail; } executor; /** is the transport destroying itself? */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index e827a43f7a..2995066e51 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -79,9 +79,12 @@ void grpc_chttp2_prepare_to_read( GPR_TIMER_BEGIN("grpc_chttp2_prepare_to_read", 0); transport_parsing->next_stream_id = transport_global->next_stream_id; - transport_parsing->last_sent_max_table_size = - transport_global->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]; + memcpy(transport_parsing->last_sent_settings, + transport_global->settings[GRPC_SENT_SETTINGS], + sizeof(transport_parsing->last_sent_settings)); + transport_parsing->max_frame_size = + transport_global->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; /* update the parsing view of incoming window */ while (grpc_chttp2_list_pop_unannounced_incoming_window_available( @@ -388,6 +391,12 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, return 1; } goto dts_fh_0; /* loop */ + } else if (transport_parsing->incoming_frame_size > + transport_parsing->max_frame_size) { + gpr_log(GPR_DEBUG, "Frame size %d is larger than max frame size %d", + transport_parsing->incoming_frame_size, + transport_parsing->max_frame_size); + return 0; } if (++cur == end) { return 1; @@ -840,7 +849,11 @@ static int init_settings_frame_parser( transport_parsing->settings_ack_received = 1; grpc_chttp2_hptbl_set_max_bytes( &transport_parsing->hpack_parser.table, - transport_parsing->last_sent_max_table_size); + transport_parsing + ->last_sent_settings[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); + transport_parsing->max_frame_size = + transport_parsing + ->last_sent_settings[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; } transport_parsing->parser = grpc_chttp2_settings_parser_parse; transport_parsing->parser_data = &transport_parsing->simple.settings; diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c new file mode 100644 index 0000000000..df1acddcc0 --- /dev/null +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -0,0 +1,69 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/impl/codegen/port_platform.h> + +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/transport_impl.h" + +// Cronet transport object +typedef struct cronet_transport { + grpc_transport base; // must be first element in this structure + void *engine; + char *host; +} cronet_transport; + +extern grpc_transport_vtable grpc_cronet_vtable; + +GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( + void *engine, const char *target, const grpc_channel_args *args, + void *reserved) { + cronet_transport *ct = gpr_malloc(sizeof(cronet_transport)); + ct->base.vtable = &grpc_cronet_vtable; + ct->engine = engine; + ct->host = gpr_malloc(strlen(target) + 1); + strcpy(ct->host, target); + gpr_log(GPR_DEBUG, + "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine, + ct->host); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + return grpc_channel_create(&exec_ctx, target, args, + GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); +} diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c new file mode 100644 index 0000000000..687026c9fd --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -0,0 +1,85 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* This file has empty implementation of all the functions exposed by the cronet +library, so we can build it in all environments */ + +#include <stdbool.h> + +#include <grpc/support/log.h> + +#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" + +#ifdef GRPC_COMPILE_WITH_CRONET +/* link with the real CRONET library in the build system */ +#else +/* Dummy implementation of cronet API just to test for build-ability */ +cronet_bidirectional_stream* cronet_bidirectional_stream_create( + cronet_engine* engine, void* annotation, + cronet_bidirectional_stream_callback* callback) { + GPR_ASSERT(0); + return NULL; +} + +int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_start( + cronet_bidirectional_stream* stream, const char* url, int priority, + const char* method, const cronet_bidirectional_stream_header_array* headers, + bool end_of_stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream, + char* buffer, int capacity) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, + const char* buffer, int count, + bool end_of_stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { + GPR_ASSERT(0); + return 0; +} + +#endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c new file mode 100644 index 0000000000..5bb085195c --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -0,0 +1,640 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/impl/codegen/port_platform.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport_impl.h" +#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" + +#define GRPC_HEADER_SIZE_IN_BYTES 5 + +// Global flag that gets set with GRPC_TRACE env variable +int grpc_cronet_trace = 1; + +// Cronet transport object +struct grpc_cronet_transport { + grpc_transport base; /* must be first element in this structure */ + cronet_engine *engine; + char *host; +}; + +typedef struct grpc_cronet_transport grpc_cronet_transport; + +enum send_state { + CRONET_SEND_IDLE = 0, + CRONET_REQ_STARTED, + CRONET_SEND_HEADER, + CRONET_WRITE, + CRONET_WRITE_COMPLETED, +}; + +enum recv_state { + CRONET_RECV_IDLE = 0, + CRONET_RECV_READ_LENGTH, + CRONET_RECV_READ_DATA, + CRONET_RECV_CLOSED, +}; + +static const char *recv_state_name[] = { + "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,", + "CRONET_RECV_CLOSED"}; + +// Enum that identifies calling function. +enum e_caller { + PERFORM_STREAM_OP, + ON_READ_COMPLETE, + ON_RESPONSE_HEADERS_RECEIVED, + ON_RESPONSE_TRAILERS_RECEIVED +}; + +enum callback_id { + CB_SEND_INITIAL_METADATA = 0, + CB_SEND_MESSAGE, + CB_SEND_TRAILING_METADATA, + CB_RECV_MESSAGE, + CB_RECV_INITIAL_METADATA, + CB_RECV_TRAILING_METADATA, + CB_NUM_CALLBACKS +}; + +struct stream_obj { + // we store received bytes here as they trickle in. + gpr_slice_buffer write_slice_buffer; + cronet_bidirectional_stream *cbs; + gpr_slice slice; + gpr_slice_buffer read_slice_buffer; + struct grpc_slice_buffer_stream sbs; + char *read_buffer; + int remaining_read_bytes; + int total_read_bytes; + + char *write_buffer; + size_t write_buffer_size; + + // Hold the URL + char *url; + + bool response_headers_received; + bool read_requested; + bool response_trailers_received; + bool read_closed; + + // Recv message stuff + grpc_byte_buffer **recv_message; + // Initial metadata stuff + grpc_metadata_batch *recv_initial_metadata; + // Trailing metadata stuff + grpc_metadata_batch *recv_trailing_metadata; + grpc_chttp2_incoming_metadata_buffer imb; + + // This mutex protects receive state machine execution + gpr_mu recv_mu; + // we can queue up up to 2 callbacks for each OP + grpc_closure *callback_list[CB_NUM_CALLBACKS][2]; + + // storage for header + cronet_bidirectional_stream_header *headers; + uint32_t num_headers; + cronet_bidirectional_stream_header_array header_array; + // state tracking + enum recv_state cronet_recv_state; + enum send_state cronet_send_state; +}; + +typedef struct stream_obj stream_obj; + +static void next_send_step(stream_obj *s); +static void next_recv_step(stream_obj *s, enum e_caller caller); + +static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) {} + +static void enqueue_callbacks(grpc_closure *callback_list[]) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (callback_list[0]) { + grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL); + callback_list[0] = NULL; + } + if (callback_list[1]) { + grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL); + callback_list[1] = NULL; + } + grpc_exec_ctx_finish(&exec_ctx); +} + +static void on_canceled(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_canceled %p", stream); + } +} + +static void on_failed(cronet_bidirectional_stream *stream, int net_error) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error); + } +} + +static void on_succeeded(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_succeeded %p", stream); + } +} + +static void on_response_trailers_received( + cronet_bidirectional_stream *stream, + const cronet_bidirectional_stream_header_array *trailers) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_response_trailers_received"); + } + stream_obj *s = (stream_obj *)stream->annotation; + + memset(&s->imb, 0, sizeof(s->imb)); + grpc_chttp2_incoming_metadata_buffer_init(&s->imb); + unsigned int i = 0; + for (i = 0; i < trailers->count; i++) { + grpc_chttp2_incoming_metadata_buffer_add( + &s->imb, grpc_mdelem_from_metadata_strings( + grpc_mdstr_from_string(trailers->headers[i].key), + grpc_mdstr_from_string(trailers->headers[i].value))); + } + s->response_trailers_received = true; + next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED); +} + +static void on_write_completed(cronet_bidirectional_stream *stream, + const char *data) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: on_write_completed"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]); + s->cronet_send_state = CRONET_WRITE_COMPLETED; + next_send_step(s); +} + +static void process_recv_message(stream_obj *s, const uint8_t *recv_data) { + gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes); + uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); + memcpy(dst_p, recv_data, (size_t)s->total_read_bytes); + gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); + grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0); + *s->recv_message = (grpc_byte_buffer *)&s->sbs; +} + +static int parse_grpc_header(const uint8_t *data) { + const uint8_t *p = data + 1; + int length = 0; + length |= ((uint8_t)*p++) << 24; + length |= ((uint8_t)*p++) << 16; + length |= ((uint8_t)*p++) << 8; + length |= ((uint8_t)*p++); + return length; +} + +static void on_read_completed(cronet_bidirectional_stream *stream, char *data, + int count) { + stream_obj *s = (stream_obj *)stream->annotation; + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d", + count, s->total_read_bytes, s->remaining_read_bytes); + } + if (count > 0) { + GPR_ASSERT(s->recv_message); + s->remaining_read_bytes -= count; + next_recv_step(s, ON_READ_COMPLETE); + } else { + s->read_closed = true; + next_recv_step(s, ON_READ_COMPLETE); + } +} + +static void on_response_headers_received( + cronet_bidirectional_stream *stream, + const cronet_bidirectional_stream_header_array *headers, + const char *negotiated_protocol) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_response_headers_received"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]); + s->response_headers_received = true; + next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED); +} + +static void on_request_headers_sent(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: on_request_headers_sent"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]); + s->cronet_send_state = CRONET_SEND_HEADER; + next_send_step(s); +} + +// Callback function pointers (invoked by cronet in response to events) +static cronet_bidirectional_stream_callback callbacks = { + on_request_headers_sent, + on_response_headers_received, + on_read_completed, + on_write_completed, + on_response_trailers_received, + on_succeeded, + on_failed, + on_canceled}; + +static void invoke_closing_callback(stream_obj *s) { + grpc_chttp2_incoming_metadata_buffer_publish(&s->imb, + s->recv_trailing_metadata); + if (s->callback_list[CB_RECV_TRAILING_METADATA]) { + enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]); + } +} + +static void set_recv_state(stream_obj *s, enum recv_state state) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]); + } + s->cronet_recv_state = state; +} + +// This is invoked from perform_stream_op, and all on_xxxx callbacks. +static void next_recv_step(stream_obj *s, enum e_caller caller) { + gpr_mu_lock(&s->recv_mu); + switch (s->cronet_recv_state) { + case CRONET_RECV_IDLE: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE"); + } + if (caller == PERFORM_STREAM_OP || + caller == ON_RESPONSE_HEADERS_RECEIVED) { + if (s->read_closed && s->response_trailers_received) { + invoke_closing_callback(s); + set_recv_state(s, CRONET_RECV_CLOSED); + } else if (s->response_headers_received == true && + s->read_requested == true) { + set_recv_state(s, CRONET_RECV_READ_LENGTH); + s->total_read_bytes = s->remaining_read_bytes = + GRPC_HEADER_SIZE_IN_BYTES; + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read(s->cbs, s->read_buffer, + s->remaining_read_bytes); + } + } + break; + case CRONET_RECV_READ_LENGTH: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH"); + } + if (caller == ON_READ_COMPLETE) { + if (s->read_closed) { + invoke_closing_callback(s); + enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + set_recv_state(s, CRONET_RECV_CLOSED); + } else { + GPR_ASSERT(s->remaining_read_bytes == 0); + set_recv_state(s, CRONET_RECV_READ_DATA); + s->total_read_bytes = s->remaining_read_bytes = + parse_grpc_header((const uint8_t *)s->read_buffer); + s->read_buffer = + gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes); + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, + s->remaining_read_bytes); + } + } + break; + case CRONET_RECV_READ_DATA: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA"); + } + if (caller == ON_READ_COMPLETE) { + if (s->remaining_read_bytes > 0) { + int offset = s->total_read_bytes - s->remaining_read_bytes; + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read( + s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes); + } else { + gpr_slice_buffer_init(&s->read_slice_buffer); + uint8_t *p = (uint8_t *)s->read_buffer; + process_recv_message(s, p); + set_recv_state(s, CRONET_RECV_IDLE); + enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + } + } + break; + case CRONET_RECV_CLOSED: + break; + default: + GPR_ASSERT(0); // Should not reach here + break; + } + gpr_mu_unlock(&s->recv_mu); +} + +// This function takes the data from s->write_slice_buffer and assembles into +// a contiguous byte stream with 5 byte gRPC header prepended. +static void create_grpc_frame(stream_obj *s) { + gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer); + uint8_t *raw_data = GPR_SLICE_START_PTR(slice); + size_t length = GPR_SLICE_LENGTH(slice); + s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; + s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size); + uint8_t *p = (uint8_t *)s->write_buffer; + // Append 5 byte header + *p++ = 0; + *p++ = (uint8_t)(length >> 24); + *p++ = (uint8_t)(length >> 16); + *p++ = (uint8_t)(length >> 8); + *p++ = (uint8_t)(length); + // append actual data + memcpy(p, raw_data, length); +} + +static void do_write(stream_obj *s) { + gpr_slice_buffer *sb = &s->write_slice_buffer; + GPR_ASSERT(sb->count <= 1); + if (sb->count > 0) { + create_grpc_frame(s); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + } + cronet_bidirectional_stream_write(s->cbs, s->write_buffer, + (int)s->write_buffer_size, false); + } +} + +// +static void next_send_step(stream_obj *s) { + switch (s->cronet_send_state) { + case CRONET_SEND_IDLE: + GPR_ASSERT( + s->cbs); // cronet_bidirectional_stream is not initialized yet. + s->cronet_send_state = CRONET_REQ_STARTED; + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url); + } + cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST", + &s->header_array, false); + // we no longer need the memory that was allocated earlier. + gpr_free(s->header_array.headers); + break; + case CRONET_SEND_HEADER: + do_write(s); + s->cronet_send_state = CRONET_WRITE; + break; + case CRONET_WRITE_COMPLETED: + do_write(s); + break; + default: + GPR_ASSERT(0); + break; + } +} + +static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, + const char *host, + stream_obj *s) { + grpc_linked_mdelem *curr = head; + // Walk the linked list and get number of header fields + uint32_t num_headers_available = 0; + while (curr != NULL) { + curr = curr->next; + num_headers_available++; + } + // Allocate enough memory + s->headers = (cronet_bidirectional_stream_header *)gpr_malloc( + sizeof(cronet_bidirectional_stream_header) * num_headers_available); + + // Walk the linked list again, this time copying the header fields. + // s->num_headers + // can be less than num_headers_available, as some headers are not used for + // cronet + curr = head; + s->num_headers = 0; + while (s->num_headers < num_headers_available) { + grpc_mdelem *mdelem = curr->md; + curr = curr->next; + const char *key = grpc_mdstr_as_c_string(mdelem->key); + const char *value = grpc_mdstr_as_c_string(mdelem->value); + if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 || + strcmp(key, ":authority") == 0) { + // Cronet populates these fields on its own. + continue; + } + if (strcmp(key, ":path") == 0) { + // Create URL by appending :path value to the hostname + gpr_asprintf(&s->url, "https://%s%s", host, value); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "extracted URL = %s", s->url); + } + continue; + } + s->headers[s->num_headers].key = key; + s->headers[s->num_headers].value = value; + s->num_headers++; + if (curr == NULL) { + break; + } + } +} + +static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; + GPR_ASSERT(ct->engine); + stream_obj *s = (stream_obj *)gs; + if (op->recv_trailing_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - recv_trailing_metadata: on_complete=%p", + op->on_complete); + } + s->recv_trailing_metadata = op->recv_trailing_metadata; + GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]); + s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete; + } + if (op->recv_message) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p", + op->on_complete); + } + s->recv_message = (grpc_byte_buffer **)op->recv_message; + GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]); + GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]); + s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready; + s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete; + s->read_requested = true; + next_recv_step(s, PERFORM_STREAM_OP); + } + if (op->recv_initial_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p", + op->on_complete); + } + s->recv_initial_metadata = op->recv_initial_metadata; + GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]); + GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]); + s->callback_list[CB_RECV_INITIAL_METADATA][0] = + op->recv_initial_metadata_ready; + s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete; + } + if (op->send_initial_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - send_initial_metadata: on_complete=%p", + op->on_complete); + } + s->num_headers = 0; + convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head, + ct->host, s); + s->header_array.count = s->num_headers; + s->header_array.capacity = s->num_headers; + s->header_array.headers = s->headers; + GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]); + s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete; + } + if (op->send_message) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p", + op->on_complete); + } + grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice, + op->send_message->length, NULL); + // Check that compression flag is not ON. We don't support compression yet. + // TODO (makdharma): add compression support + GPR_ASSERT(op->send_message->flags == 0); + gpr_slice_buffer_add(&s->write_slice_buffer, s->slice); + if (s->cbs == NULL) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create"); + } + s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks); + GPR_ASSERT(s->cbs); + s->read_closed = false; + s->response_trailers_received = false; + s->response_headers_received = false; + s->cronet_send_state = CRONET_SEND_IDLE; + s->cronet_recv_state = CRONET_RECV_IDLE; + } + GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]); + s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete; + next_send_step(s); + } + if (op->send_trailing_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - send_trailing_metadata: on_complete=%p", + op->on_complete); + } + GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]); + s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete; + if (s->cbs) { + // Send an "empty" write to the far end to signal that we're done. + // This will induce the server to send down trailers. + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + } + cronet_bidirectional_stream_write(s->cbs, "abc", 0, true); + } else { + // We never created a stream. This was probably an empty request. + invoke_closing_callback(s); + } + } +} + +static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_stream_refcount *refcount, + const void *server_data) { + stream_obj *s = (stream_obj *)gs; + memset(s->callback_list, 0, sizeof(s->callback_list)); + s->cbs = NULL; + gpr_mu_init(&s->recv_mu); + s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); + s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); + gpr_slice_buffer_init(&s->write_slice_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_transport - init_stream"); + } + return 0; +} + +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, void *and_free_memory) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Destroy stream"); + } + stream_obj *s = (stream_obj *)gs; + s->cbs = NULL; + gpr_free(s->read_buffer); + gpr_free(s->write_buffer); + gpr_free(s->url); + gpr_mu_destroy(&s->recv_mu); + if (and_free_memory) { + gpr_free(and_free_memory); + } +} + +static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { + grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; + gpr_free(ct->host); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Destroy transport"); + } +} + +const grpc_transport_vtable grpc_cronet_vtable = { + sizeof(stream_obj), "cronet_http", init_stream, + set_pollset_do_nothing, perform_stream_op, NULL, + destroy_stream, destroy_transport, NULL}; |