aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c3
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.c69
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c85
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c640
4 files changed, 796 insertions, 1 deletions
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index a74c1a27eb..629959b7c0 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -178,6 +178,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_server_security_connector *sc = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *err = GRPC_ERROR_NONE;
+ grpc_error **errors = NULL;
GRPC_API_TRACE(
"grpc_server_add_secure_http2_port("
@@ -220,7 +221,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
gpr_mu_init(&state->mu);
gpr_ref_init(&state->refcount, 1);
- grpc_error **errors = gpr_malloc(sizeof(*errors) * resolved->naddrs);
+ errors = gpr_malloc(sizeof(*errors) * resolved->naddrs);
for (i = 0; i < resolved->naddrs; i++) {
errors[i] = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
new file mode 100644
index 0000000000..df1acddcc0
--- /dev/null
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/transport_impl.h"
+
+// Cronet transport object
+typedef struct cronet_transport {
+ grpc_transport base; // must be first element in this structure
+ void *engine;
+ char *host;
+} cronet_transport;
+
+extern grpc_transport_vtable grpc_cronet_vtable;
+
+GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
+ void *engine, const char *target, const grpc_channel_args *args,
+ void *reserved) {
+ cronet_transport *ct = gpr_malloc(sizeof(cronet_transport));
+ ct->base.vtable = &grpc_cronet_vtable;
+ ct->engine = engine;
+ ct->host = gpr_malloc(strlen(target) + 1);
+ strcpy(ct->host, target);
+ gpr_log(GPR_DEBUG,
+ "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine,
+ ct->host);
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ return grpc_channel_create(&exec_ctx, target, args,
+ GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct);
+}
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
new file mode 100644
index 0000000000..687026c9fd
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* This file has empty implementation of all the functions exposed by the cronet
+library, so we can build it in all environments */
+
+#include <stdbool.h>
+
+#include <grpc/support/log.h>
+
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#ifdef GRPC_COMPILE_WITH_CRONET
+/* link with the real CRONET library in the build system */
+#else
+/* Dummy implementation of cronet API just to test for build-ability */
+cronet_bidirectional_stream* cronet_bidirectional_stream_create(
+ cronet_engine* engine, void* annotation,
+ cronet_bidirectional_stream_callback* callback) {
+ GPR_ASSERT(0);
+ return NULL;
+}
+
+int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_start(
+ cronet_bidirectional_stream* stream, const char* url, int priority,
+ const char* method, const cronet_bidirectional_stream_header_array* headers,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
+ char* buffer, int capacity) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
+ const char* buffer, int count,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+#endif /* GRPC_COMPILE_WITH_CRONET */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
new file mode 100644
index 0000000000..ea0f2bcba4
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -0,0 +1,640 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/impl/codegen/port_platform.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/transport_impl.h"
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+
+// Global flag that gets set with GRPC_TRACE env variable
+int grpc_cronet_trace = 1;
+
+// Cronet transport object
+struct grpc_cronet_transport {
+ grpc_transport base; /* must be first element in this structure */
+ cronet_engine *engine;
+ char *host;
+};
+
+typedef struct grpc_cronet_transport grpc_cronet_transport;
+
+enum send_state {
+ CRONET_SEND_IDLE = 0,
+ CRONET_REQ_STARTED,
+ CRONET_SEND_HEADER,
+ CRONET_WRITE,
+ CRONET_WRITE_COMPLETED,
+};
+
+enum recv_state {
+ CRONET_RECV_IDLE = 0,
+ CRONET_RECV_READ_LENGTH,
+ CRONET_RECV_READ_DATA,
+ CRONET_RECV_CLOSED,
+};
+
+static const char *recv_state_name[] = {
+ "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,",
+ "CRONET_RECV_CLOSED"};
+
+// Enum that identifies calling function.
+enum e_caller {
+ PERFORM_STREAM_OP,
+ ON_READ_COMPLETE,
+ ON_RESPONSE_HEADERS_RECEIVED,
+ ON_RESPONSE_TRAILERS_RECEIVED
+};
+
+enum callback_id {
+ CB_SEND_INITIAL_METADATA = 0,
+ CB_SEND_MESSAGE,
+ CB_SEND_TRAILING_METADATA,
+ CB_RECV_MESSAGE,
+ CB_RECV_INITIAL_METADATA,
+ CB_RECV_TRAILING_METADATA,
+ CB_NUM_CALLBACKS
+};
+
+struct stream_obj {
+ // we store received bytes here as they trickle in.
+ gpr_slice_buffer write_slice_buffer;
+ cronet_bidirectional_stream *cbs;
+ gpr_slice slice;
+ gpr_slice_buffer read_slice_buffer;
+ struct grpc_slice_buffer_stream sbs;
+ char *read_buffer;
+ int remaining_read_bytes;
+ int total_read_bytes;
+
+ char *write_buffer;
+ size_t write_buffer_size;
+
+ // Hold the URL
+ char *url;
+
+ bool response_headers_received;
+ bool read_requested;
+ bool response_trailers_received;
+ bool read_closed;
+
+ // Recv message stuff
+ grpc_byte_buffer **recv_message;
+ // Initial metadata stuff
+ grpc_metadata_batch *recv_initial_metadata;
+ // Trailing metadata stuff
+ grpc_metadata_batch *recv_trailing_metadata;
+ grpc_chttp2_incoming_metadata_buffer imb;
+
+ // This mutex protects receive state machine execution
+ gpr_mu recv_mu;
+ // we can queue up up to 2 callbacks for each OP
+ grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
+
+ // storage for header
+ cronet_bidirectional_stream_header *headers;
+ uint32_t num_headers;
+ cronet_bidirectional_stream_header_array header_array;
+ // state tracking
+ enum recv_state cronet_recv_state;
+ enum send_state cronet_send_state;
+};
+
+typedef struct stream_obj stream_obj;
+
+static void next_send_step(stream_obj *s);
+static void next_recv_step(stream_obj *s, enum e_caller caller);
+
+static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {}
+
+static void enqueue_callbacks(grpc_closure *callback_list[]) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ if (callback_list[0]) {
+ grpc_exec_ctx_push(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL);
+ callback_list[0] = NULL;
+ }
+ if (callback_list[1]) {
+ grpc_exec_ctx_push(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL);
+ callback_list[1] = NULL;
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void on_canceled(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_canceled %p", stream);
+ }
+}
+
+static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
+ }
+}
+
+static void on_succeeded(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_succeeded %p", stream);
+ }
+}
+
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *trailers) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_trailers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+
+ memset(&s->imb, 0, sizeof(s->imb));
+ grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
+ unsigned int i = 0;
+ for (i = 0; i < trailers->count; i++) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->imb, grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
+ }
+ s->response_trailers_received = true;
+ next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
+}
+
+static void on_write_completed(cronet_bidirectional_stream *stream,
+ const char *data) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_write_completed");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
+ s->cronet_send_state = CRONET_WRITE_COMPLETED;
+ next_send_step(s);
+}
+
+static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
+ gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
+ uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
+ gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
+ *s->recv_message = (grpc_byte_buffer *)&s->sbs;
+}
+
+static int parse_grpc_header(const uint8_t *data) {
+ const uint8_t *p = data + 1;
+ int length = 0;
+ length |= ((uint8_t)*p++) << 24;
+ length |= ((uint8_t)*p++) << 16;
+ length |= ((uint8_t)*p++) << 8;
+ length |= ((uint8_t)*p++);
+ return length;
+}
+
+static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+ int count) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d",
+ count, s->total_read_bytes, s->remaining_read_bytes);
+ }
+ if (count > 0) {
+ GPR_ASSERT(s->recv_message);
+ s->remaining_read_bytes -= count;
+ next_recv_step(s, ON_READ_COMPLETE);
+ } else {
+ s->read_closed = true;
+ next_recv_step(s, ON_READ_COMPLETE);
+ }
+}
+
+static void on_response_headers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *headers,
+ const char *negotiated_protocol) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_headers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
+ s->response_headers_received = true;
+ next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
+}
+
+static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_request_headers_sent");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
+ s->cronet_send_state = CRONET_SEND_HEADER;
+ next_send_step(s);
+}
+
+// Callback function pointers (invoked by cronet in response to events)
+static cronet_bidirectional_stream_callback callbacks = {
+ on_request_headers_sent,
+ on_response_headers_received,
+ on_read_completed,
+ on_write_completed,
+ on_response_trailers_received,
+ on_succeeded,
+ on_failed,
+ on_canceled};
+
+static void invoke_closing_callback(stream_obj *s) {
+ grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
+ s->recv_trailing_metadata);
+ if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
+ enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
+ }
+}
+
+static void set_recv_state(stream_obj *s, enum recv_state state) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]);
+ }
+ s->cronet_recv_state = state;
+}
+
+// This is invoked from perform_stream_op, and all on_xxxx callbacks.
+static void next_recv_step(stream_obj *s, enum e_caller caller) {
+ gpr_mu_lock(&s->recv_mu);
+ switch (s->cronet_recv_state) {
+ case CRONET_RECV_IDLE:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
+ }
+ if (caller == PERFORM_STREAM_OP ||
+ caller == ON_RESPONSE_HEADERS_RECEIVED) {
+ if (s->read_closed && s->response_trailers_received) {
+ invoke_closing_callback(s);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else if (s->response_headers_received == true &&
+ s->read_requested == true) {
+ set_recv_state(s, CRONET_RECV_READ_LENGTH);
+ s->total_read_bytes = s->remaining_read_bytes =
+ GRPC_HEADER_SIZE_IN_BYTES;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_LENGTH:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->read_closed) {
+ invoke_closing_callback(s);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else {
+ GPR_ASSERT(s->remaining_read_bytes == 0);
+ set_recv_state(s, CRONET_RECV_READ_DATA);
+ s->total_read_bytes = s->remaining_read_bytes =
+ parse_grpc_header((const uint8_t *)s->read_buffer);
+ s->read_buffer =
+ gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_DATA:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->remaining_read_bytes > 0) {
+ int offset = s->total_read_bytes - s->remaining_read_bytes;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(
+ s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes);
+ } else {
+ gpr_slice_buffer_init(&s->read_slice_buffer);
+ uint8_t *p = (uint8_t *)s->read_buffer;
+ process_recv_message(s, p);
+ set_recv_state(s, CRONET_RECV_IDLE);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ }
+ }
+ break;
+ case CRONET_RECV_CLOSED:
+ break;
+ default:
+ GPR_ASSERT(0); // Should not reach here
+ break;
+ }
+ gpr_mu_unlock(&s->recv_mu);
+}
+
+// This function takes the data from s->write_slice_buffer and assembles into
+// a contiguous byte stream with 5 byte gRPC header prepended.
+static void create_grpc_frame(stream_obj *s) {
+ gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer);
+ uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
+ size_t length = GPR_SLICE_LENGTH(slice);
+ s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
+ s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
+ uint8_t *p = (uint8_t *)s->write_buffer;
+ // Append 5 byte header
+ *p++ = 0;
+ *p++ = (uint8_t)(length >> 24);
+ *p++ = (uint8_t)(length >> 16);
+ *p++ = (uint8_t)(length >> 8);
+ *p++ = (uint8_t)(length);
+ // append actual data
+ memcpy(p, raw_data, length);
+}
+
+static void do_write(stream_obj *s) {
+ gpr_slice_buffer *sb = &s->write_slice_buffer;
+ GPR_ASSERT(sb->count <= 1);
+ if (sb->count > 0) {
+ create_grpc_frame(s);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
+ (int)s->write_buffer_size, false);
+ }
+}
+
+//
+static void next_send_step(stream_obj *s) {
+ switch (s->cronet_send_state) {
+ case CRONET_SEND_IDLE:
+ GPR_ASSERT(
+ s->cbs); // cronet_bidirectional_stream is not initialized yet.
+ s->cronet_send_state = CRONET_REQ_STARTED;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
+ }
+ cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
+ &s->header_array, false);
+ // we no longer need the memory that was allocated earlier.
+ gpr_free(s->header_array.headers);
+ break;
+ case CRONET_SEND_HEADER:
+ do_write(s);
+ s->cronet_send_state = CRONET_WRITE;
+ break;
+ case CRONET_WRITE_COMPLETED:
+ do_write(s);
+ break;
+ default:
+ GPR_ASSERT(0);
+ break;
+ }
+}
+
+static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
+ const char *host,
+ stream_obj *s) {
+ grpc_linked_mdelem *curr = head;
+ // Walk the linked list and get number of header fields
+ uint32_t num_headers_available = 0;
+ while (curr != NULL) {
+ curr = curr->next;
+ num_headers_available++;
+ }
+ // Allocate enough memory
+ s->headers = (cronet_bidirectional_stream_header *)gpr_malloc(
+ sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+
+ // Walk the linked list again, this time copying the header fields.
+ // s->num_headers
+ // can be less than num_headers_available, as some headers are not used for
+ // cronet
+ curr = head;
+ s->num_headers = 0;
+ while (s->num_headers < num_headers_available) {
+ grpc_mdelem *mdelem = curr->md;
+ curr = curr->next;
+ const char *key = grpc_mdstr_as_c_string(mdelem->key);
+ const char *value = grpc_mdstr_as_c_string(mdelem->value);
+ if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
+ strcmp(key, ":authority") == 0) {
+ // Cronet populates these fields on its own.
+ continue;
+ }
+ if (strcmp(key, ":path") == 0) {
+ // Create URL by appending :path value to the hostname
+ gpr_asprintf(&s->url, "https://%s%s", host, value);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
+ }
+ continue;
+ }
+ s->headers[s->num_headers].key = key;
+ s->headers[s->num_headers].value = value;
+ s->num_headers++;
+ if (curr == NULL) {
+ break;
+ }
+ }
+}
+
+static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ GPR_ASSERT(ct->engine);
+ stream_obj *s = (stream_obj *)gs;
+ if (op->recv_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - recv_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_trailing_metadata = op->recv_trailing_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
+ s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
+ }
+ if (op->recv_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_message = (grpc_byte_buffer **)op->recv_message;
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
+ s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
+ s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
+ s->read_requested = true;
+ next_recv_step(s, PERFORM_STREAM_OP);
+ }
+ if (op->recv_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p",
+ op->on_complete);
+ }
+ s->recv_initial_metadata = op->recv_initial_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
+ s->callback_list[CB_RECV_INITIAL_METADATA][0] =
+ op->recv_initial_metadata_ready;
+ s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
+ }
+ if (op->send_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_initial_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->num_headers = 0;
+ convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
+ ct->host, s);
+ s->header_array.count = s->num_headers;
+ s->header_array.capacity = s->num_headers;
+ s->header_array.headers = s->headers;
+ GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
+ s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
+ }
+ if (op->send_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p",
+ op->on_complete);
+ }
+ grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
+ op->send_message->length, NULL);
+ // Check that compression flag is not ON. We don't support compression yet.
+ // TODO (makdharma): add compression support
+ GPR_ASSERT(op->send_message->flags == 0);
+ gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
+ if (s->cbs == NULL) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
+ }
+ s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
+ GPR_ASSERT(s->cbs);
+ s->read_closed = false;
+ s->response_trailers_received = false;
+ s->response_headers_received = false;
+ s->cronet_send_state = CRONET_SEND_IDLE;
+ s->cronet_recv_state = CRONET_RECV_IDLE;
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
+ s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
+ next_send_step(s);
+ }
+ if (op->send_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
+ s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
+ if (s->cbs) {
+ // Send an "empty" write to the far end to signal that we're done.
+ // This will induce the server to send down trailers.
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
+ } else {
+ // We never created a stream. This was probably an empty request.
+ invoke_closing_callback(s);
+ }
+ }
+}
+
+static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_stream_refcount *refcount,
+ const void *server_data) {
+ stream_obj *s = (stream_obj *)gs;
+ memset(s->callback_list, 0, sizeof(s->callback_list));
+ s->cbs = NULL;
+ gpr_mu_init(&s->recv_mu);
+ s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ gpr_slice_buffer_init(&s->write_slice_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_transport - init_stream");
+ }
+ return 0;
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, void *and_free_memory) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy stream");
+ }
+ stream_obj *s = (stream_obj *)gs;
+ s->cbs = NULL;
+ gpr_free(s->read_buffer);
+ gpr_free(s->write_buffer);
+ gpr_free(s->url);
+ gpr_mu_destroy(&s->recv_mu);
+ if (and_free_memory) {
+ gpr_free(and_free_memory);
+ }
+}
+
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ gpr_free(ct->host);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy transport");
+ }
+}
+
+const grpc_transport_vtable grpc_cronet_vtable = {
+ sizeof(stream_obj), "cronet_http", init_stream,
+ set_pollset_do_nothing, perform_stream_op, NULL,
+ destroy_stream, destroy_transport, NULL};