aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Makarand Dharmapurikar <makarandd@google.com>2016-04-19 10:31:25 -0700
committerGravatar Makarand Dharmapurikar <makarandd@google.com>2016-05-01 15:08:46 -0700
commita49b13bc332a38dfcf83162003cbaa0a1384143b (patch)
tree0ac81109d7220dc1c6c86149199e9bd33e8f4758 /src/core/ext
parent6b1afe1ad9e6eda6864627f8ad1f42ee088ae4ed (diff)
cronet wrapper code
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.c72
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_c_for_grpc.h202
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c586
3 files changed, 860 insertions, 0 deletions
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
new file mode 100644
index 0000000000..23189809f7
--- /dev/null
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/transport_impl.h"
+
+#ifdef COMPILE_WITH_CRONET
+// Cronet transport object
+struct grpc_cronet_transport {
+ grpc_transport base; /* must be first element in this structure */
+ void *engine;
+ char *host;
+};
+
+typedef struct grpc_cronet_transport grpc_cronet_transport;
+
+extern grpc_transport_vtable cronet_vtable;
+
+GRPCAPI grpc_channel *grpc_custom_secure_channel_create(
+ void *engine, const char *target,
+ const grpc_channel_args *args, void *reserved) {
+ grpc_cronet_transport *ct = gpr_malloc(sizeof(grpc_cronet_transport));
+ ct->base.vtable = &cronet_vtable;
+ ct->engine = engine;
+ ct->host = gpr_malloc(strlen(target) + 1);
+ strcpy(ct->host, target);
+ gpr_log(
+ GPR_DEBUG, "grpc_create_cronet_transport: cronet_engine = %p, target=%s",
+ engine, ct->host);
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ return grpc_channel_create(&exec_ctx, target, args,
+ GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct);
+}
+#endif // COMPILE_WITH_CRONET
diff --git a/src/core/ext/transport/cronet/transport/cronet_c_for_grpc.h b/src/core/ext/transport/cronet/transport/cronet_c_for_grpc.h
new file mode 100644
index 0000000000..15a511aebd
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_c_for_grpc.h
@@ -0,0 +1,202 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_
+#define COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+
+/* Cronet Engine API. */
+
+/* Opaque object representing Cronet Engine. Created and configured outside
+ * of this API to facilitate sharing with other components */
+typedef struct cronet_engine { void* obj; } cronet_engine;
+
+void cronet_engine_add_quic_hint(cronet_engine* engine,
+ const char* host,
+ int port,
+ int alternate_port);
+
+/* Cronet Bidirectional Stream API */
+
+/* Opaque object representing Cronet Bidirectional Stream. */
+typedef struct cronet_bidirectional_stream {
+ void* obj;
+ void* annotation;
+} cronet_bidirectional_stream;
+
+/* A single request or response header element. */
+typedef struct cronet_bidirectional_stream_header {
+ const char* key;
+ const char* value;
+} cronet_bidirectional_stream_header;
+
+/* Array of request or response headers or trailers. */
+typedef struct cronet_bidirectional_stream_header_array {
+ size_t count;
+ size_t capacity;
+ cronet_bidirectional_stream_header* headers;
+} cronet_bidirectional_stream_header_array;
+
+/* Set of callbacks used to receive callbacks from bidirectional stream. */
+typedef struct cronet_bidirectional_stream_callback {
+ /* Invoked when request headers are sent. Indicates that stream has initiated
+ * the request. Consumer may call cronet_bidirectional_stream_write() to start
+ * writing data.
+ */
+ void (*on_request_headers_sent)(cronet_bidirectional_stream* stream);
+
+ /* Invoked when initial response headers are received.
+ * Consumer must call cronet_bidirectional_stream_read() to start reading.
+ * Consumer may call cronet_bidirectional_stream_write() to start writing or
+ * close the stream. Contents of |headers| is valid for duration of the call.
+ */
+ void (*on_response_headers_received)(
+ cronet_bidirectional_stream* stream,
+ const cronet_bidirectional_stream_header_array* headers,
+ const char* negotiated_protocol);
+
+ /* Invoked when data is read into the buffer passed to
+ * cronet_bidirectional_stream_read(). Only part of the buffer may be
+ * populated. To continue reading, call cronet_bidirectional_stream_read().
+ * It may be invoked after on_response_trailers_received()}, if there was
+ * pending read data before trailers were received.
+ *
+ * If count is 0, it means the remote side has signaled that it will send no
+ * more data; future calls to cronet_bidirectional_stream_read() will result
+ * in the on_data_read() callback or on_succeded() callback if
+ * cronet_bidirectional_stream_write() was invoked with end_of_stream set to
+ * true.
+ */
+ void (*on_read_completed)(cronet_bidirectional_stream* stream,
+ char* data,
+ int count);
+
+ /**
+ * Invoked when all data passed to cronet_bidirectional_stream_write() is
+ * sent.
+ * To continue writing, call cronet_bidirectional_stream_write().
+ */
+ void (*on_write_completed)(cronet_bidirectional_stream* stream,
+ const char* data);
+
+ /* Invoked when trailers are received before closing the stream. Only invoked
+ * when server sends trailers, which it may not. May be invoked while there is
+ * read data remaining in local buffer. Contents of |trailers| is valid for
+ * duration of the call.
+ */
+ void (*on_response_trailers_received)(
+ cronet_bidirectional_stream* stream,
+ const cronet_bidirectional_stream_header_array* trailers);
+
+ /**
+ * Invoked when there is no data to be read or written and the stream is
+ * closed successfully remotely and locally. Once invoked, no further callback
+ * methods will be invoked.
+ */
+ void (*on_succeded)(cronet_bidirectional_stream* stream);
+
+ /**
+ * Invoked if the stream failed for any reason after
+ * cronet_bidirectional_stream_start(). HTTP/2 error codes are
+ * mapped to chrome net error codes. Once invoked, no further callback methods
+ * will be invoked.
+ */
+ void (*on_failed)(cronet_bidirectional_stream* stream, int net_error);
+
+ /**
+ * Invoked if the stream was canceled via
+ * cronet_bidirectional_stream_cancel(). Once invoked, no further callback
+ * methods will be invoked.
+ */
+ void (*on_canceled)(cronet_bidirectional_stream* stream);
+} cronet_bidirectional_stream_callback;
+
+/* Create a new stream object that uses |engine| and |callback|. All stream
+ * tasks are performed asynchronously on the |engine| network thread. |callback|
+ * methods are invoked synchronously on the |engine| network thread, but must
+ * not run tasks on the current thread to prevent blocking networking operations
+ * and causing exceptions during shutdown. The |annotation| is stored in
+ * bidirectional stream for arbitrary use by application.
+ *
+ * Returned |cronet_bidirectional_stream*| is owned by the caller, and must be
+ * destroyed using |cronet_bidirectional_stream_destroy|.
+ *
+ * Both |calback| and |engine| must remain valid until stream is destroyed.
+ */
+cronet_bidirectional_stream* cronet_bidirectional_stream_create(
+ cronet_engine* engine,
+ void* annotation,
+ cronet_bidirectional_stream_callback* callback);
+
+/* TBD: The following methods return int. Should it be a custom type? */
+
+/* Destroy stream object. Destroy could be called from any thread, including
+ * network thread, but is posted, so |stream| is valid until calling task is
+ * complete.
+ */
+int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream);
+
+/* Start the stream by sending request to |url| using |method| and |headers|. If
+ * |end_of_stream| is true, then no data is expected to be written.
+ */
+int cronet_bidirectional_stream_start(
+ cronet_bidirectional_stream* stream,
+ const char* url,
+ int priority,
+ const char* method,
+ const cronet_bidirectional_stream_header_array* headers,
+ bool end_of_stream);
+
+/* Read response data into |buffer| of |capacity| length. Must only be called at
+ * most once in response to each invocation of the
+ * on_response_headers_received() and on_read_completed() methods of the
+ * cronet_bidirectional_stream_callback.
+ * Each call will result in an invocation of one of the callback's
+ * on_read_completed method if data is read, its on_succeeded() method if
+ * the stream is closed, or its on_failed() method if there's an error.
+ */
+int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
+ char* buffer,
+ int capacity);
+
+/* Read response data into |buffer| of |capacity| length. Must only be called at
+ * most once in response to each invocation of the
+ * on_response_headers_received() and on_read_completed() methods of the
+ * cronet_bidirectional_stream_callback.
+ * Each call will result in an invocation of one of the callback's
+ * on_read_completed method if data is read, its on_succeeded() method if
+ * the stream is closed, or its on_failed() method if there's an error.
+ */
+int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
+ const char* buffer,
+ int count,
+ bool end_of_stream);
+
+/* Cancels the stream. Can be called at any time after
+ * cronet_bidirectional_stream_start(). The on_canceled() method of
+ * cronet_bidirectional_stream_callback will be invoked when cancelation
+ * is complete and no further callback methods will be invoked. If the
+ * stream has completed or has not started, calling
+ * cronet_bidirectional_stream_cancel() has no effect and on_canceled() will not
+ * be invoked. At most one callback method may be invoked after
+ * cronet_bidirectional_stream_cancel() has completed.
+ */
+int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream);
+
+/* Returns true if the |stream| was successfully started and is now done
+ * (succeeded, canceled, or failed).
+ * Returns false if the |stream| stream is not yet started or is in progress.
+ */
+bool cronet_bidirectional_stream_is_done(cronet_bidirectional_stream* stream);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
new file mode 100644
index 0000000000..3b9b1b08d8
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -0,0 +1,586 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/impl/codegen/port_platform.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/transport_impl.h"
+#include "src/core/ext/transport/cronet/transport/cronet_c_for_grpc.h"
+
+#ifdef COMPILE_WITH_CRONET
+
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+#define MAX_HDRS 100
+
+#define GRPC_CRONET_TRACE(...) \
+ { \
+ if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
+ }
+#define CRONET_READ(...) \
+ { \
+ GRPC_CRONET_TRACE(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); \
+ cronet_bidirectional_stream_read(__VA_ARGS__); \
+ }
+#define SET_RECV_STATE(STATE) \
+ { \
+ GRPC_CRONET_TRACE(GPR_DEBUG, "next_state = %s", recv_state_name[STATE]); \
+ cronet_recv_state = STATE; \
+ }
+
+// Global flag that gets set with GRPC_TRACE env variable
+int grpc_cronet_trace = 1;
+
+// Cronet transport object
+struct grpc_cronet_transport {
+ grpc_transport base; /* must be first element in this structure */
+ cronet_engine *engine;
+ const char *host;
+};
+
+typedef struct grpc_cronet_transport grpc_cronet_transport;
+
+enum send_state {
+ CRONET_SEND_IDLE = 0,
+ CRONET_REQ_STARTED,
+ CRONET_SEND_HEADER,
+ CRONET_WRITE,
+ CRONET_WRITE_COMPLETED,
+};
+
+enum recv_state {
+ CRONET_RECV_IDLE = 0,
+ CRONET_RECV_READ_LENGTH,
+ CRONET_RECV_READ_DATA,
+ CRONET_RECV_CLOSED,
+};
+
+const char *recv_state_name[] = {"CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH",
+ "CRONET_RECV_READ_DATA,",
+ "CRONET_RECV_CLOSED"};
+
+// Enum that identifies calling function.
+enum e_caller {
+ PERFORM_STREAM_OP,
+ ON_READ_COMPLETE,
+ ON_RESPONSE_HEADERS_RECEIVED,
+ ON_RESPONSE_TRAILERS_RECEIVED
+};
+
+enum callback_id {
+ CB_SEND_INITIAL_METADATA = 0,
+ CB_SEND_MESSAGE,
+ CB_SEND_TRAILING_METADATA,
+ CB_RECV_MESSAGE,
+ CB_RECV_INITIAL_METADATA,
+ CB_RECV_TRAILING_METADATA,
+ CB_NUM_CALLBACKS
+};
+
+struct stream_obj {
+ // we store received bytes here as they trickle in.
+ gpr_slice_buffer write_slicebuffer;
+ cronet_bidirectional_stream *cbs;
+ gpr_slice slice;
+ gpr_slice_buffer read_slicebuffer;
+ struct grpc_slice_buffer_stream sbs;
+ char *read_buffer;
+ uint32_t remaining_read_bytes;
+ uint32_t total_read_bytes;
+
+ char *write_buffer;
+ size_t write_buffer_size;
+
+ //
+ char *url;
+ char *host;
+
+ bool response_headers_received;
+ bool read_requested;
+ bool response_trailers_received;
+ bool read_closed;
+
+ // Recv message stuff
+ grpc_byte_buffer **recv_message;
+ // Initial metadata stuff
+ grpc_metadata_batch *recv_initial_metadata;
+ // Trailing metadata stuff
+ grpc_metadata_batch *recv_trailing_metadata;
+ grpc_chttp2_incoming_metadata_buffer imb;
+
+ // This mutex protects receive state machine execution
+ gpr_mu recv_mu;
+ // we can queue up up to 2 callbacks for each OP
+ grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
+
+ // storage for header
+ cronet_bidirectional_stream_header headers[MAX_HDRS];
+ uint32_t num_headers;
+ cronet_bidirectional_stream_header_array header_array;
+};
+
+typedef struct stream_obj stream_obj;
+
+void next_send_step(stream_obj *s);
+void next_recv_step(stream_obj *s, enum e_caller caller);
+
+enum send_state cronet_send_state;
+enum recv_state cronet_recv_state;
+
+static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {}
+
+void enqueue_callbacks(grpc_closure *callback_list[]) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ if (callback_list[0]) {
+ // GRPC_CRONET_TRACE(GPR_DEBUG, "enqueuing callback = %p",
+ // callback_list[0]);
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL);
+ callback_list[0] = NULL;
+ }
+ if (callback_list[1]) {
+ // GRPC_CRONET_TRACE(GPR_DEBUG, "enqueuing callback = %p",
+ // callback_list[1]);
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL);
+ callback_list[1] = NULL;
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+void on_canceled(cronet_bidirectional_stream *stream) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "on_canceled %p", stream);
+}
+void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
+}
+void on_succeded(cronet_bidirectional_stream *stream) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "on_succeeded %p", stream);
+}
+void on_response_trailers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *trailers) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "R: on_response_trailers_received");
+ stream_obj *s = (stream_obj *)stream->annotation;
+
+ memset(&s->imb, 0, sizeof(s->imb));
+ grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
+ int i = 0;
+ for (i = 0; i < trailers->count; i++) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->imb, grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
+ }
+ s->response_trailers_received = true;
+ next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
+}
+void on_write_completed(cronet_bidirectional_stream *stream,
+ const char *data) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "W: on_write_completed");
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
+ cronet_send_state = CRONET_WRITE_COMPLETED;
+ next_send_step(s);
+}
+
+void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
+ gpr_slice read_data_slice = gpr_slice_malloc(s->total_read_bytes);
+ uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ memcpy(dst_p, recv_data, s->total_read_bytes);
+ gpr_slice_buffer_add(&s->read_slicebuffer, read_data_slice);
+ grpc_slice_buffer_stream_init(&s->sbs, &s->read_slicebuffer, 0);
+ *s->recv_message = (grpc_byte_buffer *)&s->sbs;
+}
+
+int parse_grpc_header(const uint8_t *data) {
+ const uint8_t *p = data + 1;
+ uint32_t length = 0;
+ length |= ((uint8_t)*p++) << 24;
+ length |= ((uint8_t)*p++) << 16;
+ length |= ((uint8_t)*p++) << 8;
+ length |= ((uint8_t)*p++);
+ return length;
+}
+
+void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+ int count) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ GRPC_CRONET_TRACE(GPR_DEBUG,
+ "R: on_read_completed count=%d, total=%d, remaining=%d",
+ count, s->total_read_bytes, s->remaining_read_bytes);
+ if (count > 0) {
+ GPR_ASSERT(s->recv_message);
+ s->remaining_read_bytes -= count;
+ next_recv_step(s, ON_READ_COMPLETE);
+ } else {
+ s->read_closed = true;
+ next_recv_step(s, ON_READ_COMPLETE);
+ }
+}
+
+void on_response_headers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *headers,
+ const char *negotiated_protocol) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "R: on_response_headers_received");
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
+ s->response_headers_received = true;
+ next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
+}
+
+void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "W: on_request_headers_sent");
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
+ cronet_send_state = CRONET_SEND_HEADER;
+ next_send_step(s);
+}
+
+// Callback function pointers (invoked by cronet in response to events)
+cronet_bidirectional_stream_callback callbacks = {on_request_headers_sent,
+ on_response_headers_received,
+ on_read_completed,
+ on_write_completed,
+ on_response_trailers_received,
+ on_succeded,
+ on_failed,
+ on_canceled};
+
+
+void invoke_closing_callback(stream_obj *s) {
+ grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
+ s->recv_trailing_metadata);
+ if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
+ enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
+ }
+}
+
+// This is invoked from perform_stream_op, and all on_xxxx callbacks.
+void next_recv_step(stream_obj *s, enum e_caller caller) {
+ gpr_mu_lock(&s->recv_mu);
+ switch (cronet_recv_state) {
+ case CRONET_RECV_IDLE:
+ GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
+ if (caller == PERFORM_STREAM_OP ||
+ caller == ON_RESPONSE_HEADERS_RECEIVED) {
+ if (s->read_closed && s->response_trailers_received) {
+ invoke_closing_callback(s);
+ SET_RECV_STATE(CRONET_RECV_CLOSED);
+ } else if (s->response_headers_received == true &&
+ s->read_requested == true) {
+ SET_RECV_STATE(CRONET_RECV_READ_LENGTH);
+ s->total_read_bytes = s->remaining_read_bytes =
+ GRPC_HEADER_SIZE_IN_BYTES;
+ GPR_ASSERT(s->read_buffer);
+ CRONET_READ(s->cbs, s->read_buffer, s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_LENGTH:
+ GRPC_CRONET_TRACE(GPR_DEBUG,
+ "cronet_recv_state = CRONET_RECV_READ_LENGTH");
+ if (caller == ON_READ_COMPLETE) {
+ if (s->read_closed) {
+ invoke_closing_callback(s);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ SET_RECV_STATE(CRONET_RECV_CLOSED);
+ } else {
+ GPR_ASSERT(s->remaining_read_bytes == 0);
+ SET_RECV_STATE(CRONET_RECV_READ_DATA);
+ s->total_read_bytes = s->remaining_read_bytes =
+ parse_grpc_header(s->read_buffer);
+ s->read_buffer = gpr_realloc(s->read_buffer, s->remaining_read_bytes);
+ GPR_ASSERT(s->read_buffer);
+ CRONET_READ(s->cbs, (char *)s->read_buffer, s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_DATA:
+ GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
+ if (caller == ON_READ_COMPLETE) {
+ if (s->remaining_read_bytes > 0) {
+ int offset = s->total_read_bytes - s->remaining_read_bytes;
+ GPR_ASSERT(s->read_buffer);
+ CRONET_READ(s->cbs, (char *)s->read_buffer + offset,
+ s->remaining_read_bytes);
+ } else {
+ gpr_slice_buffer_init(&s->read_slicebuffer);
+ uint8_t *p = s->read_buffer;
+ process_recv_message(s, p);
+ SET_RECV_STATE(CRONET_RECV_IDLE);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ }
+ }
+ break;
+ case CRONET_RECV_CLOSED:
+ break;
+ default:
+ GPR_ASSERT(0); // Should not reach here
+ break;
+ }
+ gpr_mu_unlock(&s->recv_mu);
+}
+
+
+// This function takes the data from s->write_slicebuffer and assembles into
+// a contiguous byte stream with 5 byte gRPC header prepended.
+void create_grpc_frame(stream_obj *s) {
+ gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slicebuffer);
+ uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
+ size_t length = GPR_SLICE_LENGTH(slice);
+ s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
+ s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
+ uint8_t *p = s->write_buffer;
+ // Append 5 byte header
+ *p++ = 0;
+ *p++ = (uint8_t)(length >> 24);
+ *p++ = (uint8_t)(length >> 16);
+ *p++ = (uint8_t)(length >> 8);
+ *p++ = (uint8_t)(length);
+ // append actual data
+ memcpy(p, raw_data, length);
+}
+
+void do_write(stream_obj *s) {
+ gpr_slice_buffer *sb = &s->write_slicebuffer;
+ GPR_ASSERT(sb->count <= 1);
+ if (sb->count > 0) {
+ create_grpc_frame(s);
+ GRPC_CRONET_TRACE(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
+ (int)s->write_buffer_size,
+ false);
+ }
+}
+
+//
+void next_send_step(stream_obj *s) {
+ switch(cronet_send_state) {
+ case CRONET_SEND_IDLE:
+ GPR_ASSERT(s->cbs); // cronet_bidirectional_stream is not initialized yet.
+ cronet_send_state = CRONET_REQ_STARTED;
+ GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_bidirectional_stream_start to %s",
+ s->url);
+ cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
+ &s->header_array, false);
+ break;
+ case CRONET_SEND_HEADER:
+ do_write(s);
+ cronet_send_state = CRONET_WRITE;
+ break;
+ case CRONET_WRITE_COMPLETED:
+ do_write(s);
+ break;
+ default:
+ GPR_ASSERT(0);
+ break;
+ }
+}
+
+void create_url(const char *path, const char *host, stream_obj *s) {
+ const char prefix[] = "https://";
+ s->url = gpr_malloc(strlen(prefix) + strlen(host) + strlen(path) + 1);
+ strcpy(s->url, prefix);
+ strcat(s->url, host);
+ strcat(s->url, path);
+}
+
+static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
+ const char *host,
+ stream_obj *s) {
+ grpc_linked_mdelem *curr = head;
+ while (s->num_headers < MAX_HDRS) {
+ grpc_mdelem *mdelem = curr->md;
+ curr = curr->next;
+ const char *key = grpc_mdstr_as_c_string(mdelem->key);
+ const char *value = grpc_mdstr_as_c_string(mdelem->value);
+ if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
+ strcmp(key, ":authority") == 0) {
+ // Cronet populates these fields on its own.
+ continue;
+ }
+ if (strcmp(key, ":path") == 0) {
+ // Create URL by appending :path value to the hostname
+ create_url(value, host, s);
+ GRPC_CRONET_TRACE(GPR_DEBUG, "extracted URL = %s", s->url);
+ continue;
+ }
+ s->headers[s->num_headers].key = key;
+ s->headers[s->num_headers].value = value;
+ s->num_headers++;
+ if (curr == NULL) {
+ break;
+ }
+ }
+}
+
+static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ GPR_ASSERT(ct->engine);
+ stream_obj *s = (stream_obj *)gs;
+ if (op->recv_trailing_metadata) {
+ GRPC_CRONET_TRACE(
+ GPR_DEBUG, "perform_stream_op - recv_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ s->recv_trailing_metadata = op->recv_trailing_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
+ s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
+ }
+ if (op->recv_message) {
+ GRPC_CRONET_TRACE(GPR_DEBUG,
+ "perform_stream_op - recv_message: on_complete=%p",
+ op->on_complete);
+ s->recv_message = op->recv_message;
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
+ s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
+ s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
+ s->read_requested = true;
+ next_recv_step(s, PERFORM_STREAM_OP);
+ }
+ if (op->recv_initial_metadata) {
+ GRPC_CRONET_TRACE(GPR_DEBUG,
+ "perform_stream_op - recv_initial_metadata:=%p",
+ op->on_complete);
+ s->recv_initial_metadata = op->recv_initial_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
+ s->callback_list[CB_RECV_INITIAL_METADATA][0] =
+ op->recv_initial_metadata_ready;
+ s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
+ }
+ if (op->send_initial_metadata) {
+ GRPC_CRONET_TRACE(
+ GPR_DEBUG, "perform_stream_op - send_initial_metadata: on_complete=%p",
+ op->on_complete);
+ s->num_headers = 0;
+ convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
+ ct->host, s);
+ s->header_array.count = s->num_headers;
+ s->header_array.capacity = s->num_headers;
+ s->header_array.headers = s->headers;
+ GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
+ s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
+ }
+ if (op->send_message) {
+ GRPC_CRONET_TRACE(GPR_DEBUG,
+ "perform_stream_op - send_message: on_complete=%p",
+ op->on_complete);
+ grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
+ op->send_message->length, NULL);
+ gpr_slice_buffer_add(&s->write_slicebuffer, s->slice);
+ if (s->cbs == NULL) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_bidirectional_stream_create");
+ s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
+ GPR_ASSERT(s->cbs);
+ s->read_closed = false;
+ s->response_trailers_received = false;
+ s->response_headers_received = false;
+ cronet_send_state = CRONET_SEND_IDLE;
+ cronet_recv_state = CRONET_RECV_IDLE;
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
+ s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
+ next_send_step(s);
+ }
+ if (op->send_trailing_metadata) {
+ GRPC_CRONET_TRACE(
+ GPR_DEBUG, "perform_stream_op - send_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
+ s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
+ if (s->cbs) {
+ // Send an "empty" write to the far end to signal that we're done.
+ // This will induce the server to send down trailers.
+ GRPC_CRONET_TRACE(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
+ } else {
+ // We never created a stream. This was probably an empty request.
+ invoke_closing_callback(s);
+ }
+ }
+}
+
+static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_stream_refcount *refcount,
+ const void *server_data) {
+ stream_obj *s = (stream_obj *)gs;
+ memset(s->callback_list, 0, sizeof(s->callback_list));
+ s->cbs = NULL;
+ gpr_mu_init(&s->recv_mu);
+ s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ gpr_slice_buffer_init(&s->write_slicebuffer);
+ GRPC_CRONET_TRACE(GPR_DEBUG, "cronet_transport - init_stream");
+ return 0;
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs) {
+ GRPC_CRONET_TRACE(GPR_DEBUG, "Destroy stream");
+ stream_obj *s = (stream_obj *)gs;
+ s->cbs = NULL;
+ gpr_free(s->read_buffer);
+ gpr_free(s->write_buffer);
+ gpr_mu_destroy(&s->recv_mu);
+}
+
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ gpr_free(ct->host);
+ GRPC_CRONET_TRACE(GPR_DEBUG, "Destroy transport");
+}
+
+const grpc_transport_vtable cronet_vtable = {sizeof(stream_obj),
+ "cronet_http",
+ init_stream,
+ set_pollset_do_nothing,
+ perform_stream_op,
+ destroy_stream,
+ destroy_transport,
+ NULL,
+ NULL};
+#endif // COMPILE_WITH_CRONET