aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c1615
1 files changed, 1615 insertions, 0 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
new file mode 100644
index 0000000000..8a6b427559
--- /dev/null
+++ b/src/core/transport/chttp2_transport.c
@@ -0,0 +1,1615 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/transport/chttp2_transport.h"
+
+#include <math.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string.h>
+#include <grpc/support/useful.h>
+#include "src/core/transport/transport_impl.h"
+#include "src/core/transport/chttp2/http2_errors.h"
+#include "src/core/transport/chttp2/hpack_parser.h"
+#include "src/core/transport/chttp2/frame_data.h"
+#include "src/core/transport/chttp2/frame_ping.h"
+#include "src/core/transport/chttp2/frame_rst_stream.h"
+#include "src/core/transport/chttp2/frame_settings.h"
+#include "src/core/transport/chttp2/frame_window_update.h"
+#include "src/core/transport/chttp2/status_conversion.h"
+#include "src/core/transport/chttp2/stream_encoder.h"
+#include "src/core/transport/chttp2/stream_map.h"
+#include "src/core/transport/chttp2/timeout_encoding.h"
+
+#define DEFAULT_WINDOW 65536
+#define MAX_WINDOW 0x7fffffffu
+
+#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+#define CLIENT_CONNECT_STRLEN 24
+
+typedef struct transport transport;
+typedef struct stream stream;
+
+/* streams are kept in various linked lists depending on what things need to
+ happen to them... this enum labels each list */
+typedef enum {
+ /* streams that have pending writes */
+ WRITABLE = 0,
+ /* streams that want to send window updates */
+ WINDOW_UPDATE,
+ /* streams that are waiting to start because there are too many concurrent
+ streams on the connection */
+ WAITING_FOR_CONCURRENCY,
+ /* streams that want to callback the application */
+ PENDING_CALLBACKS,
+ /* streams that *ARE* calling back to the application */
+ EXECUTING_CALLBACKS,
+ STREAM_LIST_COUNT /* must be last */
+} stream_list_id;
+
+/* deframer state for the overall http2 stream of bytes */
+typedef enum {
+ /* prefix: one entry per http2 connection prefix byte */
+ DTS_CLIENT_PREFIX_0 = 0,
+ DTS_CLIENT_PREFIX_1,
+ DTS_CLIENT_PREFIX_2,
+ DTS_CLIENT_PREFIX_3,
+ DTS_CLIENT_PREFIX_4,
+ DTS_CLIENT_PREFIX_5,
+ DTS_CLIENT_PREFIX_6,
+ DTS_CLIENT_PREFIX_7,
+ DTS_CLIENT_PREFIX_8,
+ DTS_CLIENT_PREFIX_9,
+ DTS_CLIENT_PREFIX_10,
+ DTS_CLIENT_PREFIX_11,
+ DTS_CLIENT_PREFIX_12,
+ DTS_CLIENT_PREFIX_13,
+ DTS_CLIENT_PREFIX_14,
+ DTS_CLIENT_PREFIX_15,
+ DTS_CLIENT_PREFIX_16,
+ DTS_CLIENT_PREFIX_17,
+ DTS_CLIENT_PREFIX_18,
+ DTS_CLIENT_PREFIX_19,
+ DTS_CLIENT_PREFIX_20,
+ DTS_CLIENT_PREFIX_21,
+ DTS_CLIENT_PREFIX_22,
+ DTS_CLIENT_PREFIX_23,
+ /* frame header byte 0... */
+ /* must follow from the prefix states */
+ DTS_FH_0,
+ DTS_FH_1,
+ DTS_FH_2,
+ DTS_FH_3,
+ DTS_FH_4,
+ DTS_FH_5,
+ DTS_FH_6,
+ DTS_FH_7,
+ /* ... frame header byte 8 */
+ DTS_FH_8,
+ /* inside a http2 frame */
+ DTS_FRAME
+} deframe_transport_state;
+
+typedef struct {
+ stream *head;
+ stream *tail;
+} stream_list;
+
+typedef struct {
+ stream *next;
+ stream *prev;
+} stream_link;
+
+typedef enum {
+ ERROR_STATE_NONE,
+ ERROR_STATE_SEEN,
+ ERROR_STATE_NOTIFIED
+} error_state;
+
+/* We keep several sets of connection wide parameters */
+typedef enum {
+ /* The settings our peer has asked for (and we have acked) */
+ PEER_SETTINGS = 0,
+ /* The settings we'd like to have */
+ LOCAL_SETTINGS,
+ /* The settings we've published to our peer */
+ SENT_SETTINGS,
+ /* The settings the peer has acked */
+ ACKED_SETTINGS,
+ NUM_SETTING_SETS
+} setting_set;
+
+/* Outstanding ping request data */
+typedef struct {
+ gpr_uint8 id[8];
+ void (*cb)(void *user_data);
+ void *user_data;
+} outstanding_ping;
+
+struct transport {
+ grpc_transport base; /* must be first */
+ const grpc_transport_callbacks *cb;
+ void *cb_user_data;
+ grpc_endpoint *ep;
+ grpc_mdctx *metadata_context;
+ gpr_refcount refs;
+ gpr_uint8 is_client;
+
+ gpr_mu mu;
+ gpr_cv cv;
+
+ /* basic state management - what are we doing at the moment? */
+ gpr_uint8 reading;
+ gpr_uint8 writing;
+ gpr_uint8 calling_back;
+ error_state error_state;
+
+ /* stream indexing */
+ gpr_uint32 next_stream_id;
+
+ /* settings */
+ gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
+ gpr_uint8 sent_local_settings;
+ gpr_uint8 dirtied_local_settings;
+
+ /* window management */
+ gpr_uint32 outgoing_window;
+ gpr_uint32 incoming_window;
+
+ /* deframing */
+ deframe_transport_state deframe_state;
+ gpr_uint8 incoming_frame_type;
+ gpr_uint8 incoming_frame_flags;
+ gpr_uint8 header_eof;
+ gpr_uint32 expect_continuation_stream_id;
+ gpr_uint32 incoming_frame_size;
+ gpr_uint32 incoming_stream_id;
+
+ /* hpack encoding */
+ grpc_chttp2_hpack_compressor hpack_compressor;
+
+ /* various parsers */
+ grpc_chttp2_hpack_parser hpack_parser;
+ /* simple one shot parsers */
+ union {
+ grpc_chttp2_window_update_parser window_update;
+ grpc_chttp2_settings_parser settings;
+ grpc_chttp2_ping_parser ping;
+ } simple_parsers;
+
+ /* state for a stream that's not yet been created */
+ grpc_stream_op_buffer new_stream_sopb;
+
+ /* active parser */
+ void *parser_data;
+ stream *incoming_stream;
+ grpc_chttp2_parse_error (*parser)(void *parser_user_data,
+ grpc_chttp2_parse_state *state,
+ gpr_slice slice, int is_last);
+
+ gpr_slice_buffer outbuf;
+ gpr_slice_buffer qbuf;
+
+ stream_list lists[STREAM_LIST_COUNT];
+ grpc_chttp2_stream_map stream_map;
+
+ /* metadata object cache */
+ grpc_mdstr *str_grpc_timeout;
+
+ /* pings */
+ outstanding_ping *pings;
+ size_t ping_count;
+ size_t ping_capacity;
+ gpr_int64 ping_counter;
+};
+
+struct stream {
+ gpr_uint32 id;
+
+ gpr_uint32 outgoing_window;
+ gpr_uint32 incoming_window;
+ gpr_uint8 write_closed;
+ gpr_uint8 read_closed;
+ gpr_uint8 cancelled;
+ gpr_uint8 allow_window_updates;
+ gpr_uint8 published_close;
+
+ stream_link links[STREAM_LIST_COUNT];
+ gpr_uint8 included[STREAM_LIST_COUNT];
+
+ grpc_stream_op_buffer outgoing_sopb;
+
+ grpc_chttp2_data_parser parser;
+
+ grpc_stream_state callback_state;
+ grpc_stream_op_buffer callback_sopb;
+};
+
+static const grpc_transport_vtable vtable;
+
+static void push_setting(transport *t, grpc_chttp2_setting_id id,
+ gpr_uint32 value);
+
+static int prepare_callbacks(transport *t);
+static void run_callbacks(transport *t);
+
+static int prepare_write(transport *t);
+static void finish_write(void *t, grpc_endpoint_cb_status status);
+
+static void lock(transport *t);
+static void unlock(transport *t);
+
+static void drop_connection(transport *t);
+static void end_all_the_calls(transport *t);
+
+static stream *stream_list_remove_head(transport *t, stream_list_id id);
+static void stream_list_remove(transport *t, stream *s, stream_list_id id);
+static void stream_list_add_tail(transport *t, stream *s, stream_list_id id);
+static void stream_list_join(transport *t, stream *s, stream_list_id id);
+
+static void cancel_stream_id(transport *t, gpr_uint32 id,
+ grpc_status_code local_status,
+ grpc_chttp2_error_code error_code, int send_rst);
+static void cancel_stream(transport *t, stream *s,
+ grpc_status_code local_status,
+ grpc_chttp2_error_code error_code, int send_rst);
+static stream *lookup_stream(transport *t, gpr_uint32 id);
+static void remove_from_stream_map(transport *t, stream *s);
+static void maybe_start_some_streams(transport *t);
+
+static void become_skip_parser(transport *t);
+
+/*
+ * CONSTRUCTION/DESTRUCTION/REFCOUNTING
+ */
+
+static void unref_transport(transport *t) {
+ size_t i;
+
+ if (!gpr_unref(&t->refs)) return;
+
+ gpr_mu_lock(&t->mu);
+
+ GPR_ASSERT(t->ep == NULL);
+
+ gpr_slice_buffer_destroy(&t->outbuf);
+ gpr_slice_buffer_destroy(&t->qbuf);
+ grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
+ grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
+
+ grpc_mdstr_unref(t->str_grpc_timeout);
+
+ for (i = 0; i < STREAM_LIST_COUNT; i++) {
+ GPR_ASSERT(t->lists[i].head == NULL);
+ GPR_ASSERT(t->lists[i].tail == NULL);
+ }
+
+ GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
+
+ grpc_chttp2_stream_map_destroy(&t->stream_map);
+
+ gpr_mu_unlock(&t->mu);
+ gpr_mu_destroy(&t->mu);
+
+ /* callback remaining pings: they're not allowed to call into the transpot,
+ and maybe they hold resources that need to be freed */
+ for (i = 0; i < t->ping_count; i++) {
+ t->pings[i].cb(t->pings[i].user_data);
+ }
+ gpr_free(t->pings);
+
+ gpr_free(t);
+}
+
+static void ref_transport(transport *t) { gpr_ref(&t->refs); }
+
+static void init_transport(transport *t, grpc_transport_setup_callback setup,
+ void *arg, const grpc_channel_args *channel_args,
+ grpc_endpoint *ep, grpc_mdctx *mdctx,
+ int is_client) {
+ size_t i;
+ int j;
+ grpc_transport_setup_result sr;
+
+ GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
+
+ t->base.vtable = &vtable;
+ t->ep = ep;
+ /* one ref is for destroy, the other for when ep becomes NULL */
+ gpr_ref_init(&t->refs, 2);
+ gpr_mu_init(&t->mu);
+ gpr_cv_init(&t->cv);
+ t->metadata_context = mdctx;
+ t->str_grpc_timeout =
+ grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
+ t->reading = 1;
+ t->writing = 0;
+ t->error_state = ERROR_STATE_NONE;
+ t->next_stream_id = is_client ? 1 : 2;
+ t->is_client = is_client;
+ t->outgoing_window = DEFAULT_WINDOW;
+ t->incoming_window = DEFAULT_WINDOW;
+ t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
+ t->expect_continuation_stream_id = 0;
+ t->pings = NULL;
+ t->ping_count = 0;
+ t->ping_capacity = 0;
+ t->ping_counter = gpr_now().tv_nsec;
+ grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
+ gpr_slice_buffer_init(&t->outbuf);
+ gpr_slice_buffer_init(&t->qbuf);
+ if (is_client) {
+ gpr_slice_buffer_add(&t->qbuf,
+ gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
+ }
+ /* 8 is a random stab in the dark as to a good initial size: it's small enough
+ that it shouldn't waste memory for infrequently used connections, yet
+ large enough that the exponential growth should happen nicely when it's
+ needed.
+ TODO(ctiller): tune this */
+ grpc_chttp2_stream_map_init(&t->stream_map, 8);
+ memset(&t->lists, 0, sizeof(t->lists));
+
+ /* copy in initial settings to all setting sets */
+ for (i = 0; i < NUM_SETTING_SETS; i++) {
+ for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
+ t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
+ }
+ }
+ t->dirtied_local_settings = 1;
+ t->sent_local_settings = 0;
+
+ /* configure http2 the way we like it */
+ if (t->is_client) {
+ push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
+ push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
+ }
+
+ if (channel_args) {
+ for (i = 0; i < channel_args->num_args; i++) {
+ if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
+ if (t->is_client) {
+ gpr_log(GPR_ERROR, "%s: is ignored on the client",
+ GRPC_ARG_MAX_CONCURRENT_STREAMS);
+ } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s: must be an integer",
+ GRPC_ARG_MAX_CONCURRENT_STREAMS);
+ } else {
+ push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ channel_args->args[i].value.integer);
+ }
+ }
+ }
+ }
+
+ gpr_mu_lock(&t->mu);
+ t->calling_back = 1;
+ ref_transport(t);
+ gpr_mu_unlock(&t->mu);
+
+ sr = setup(arg, &t->base, t->metadata_context);
+
+ lock(t);
+ t->cb = sr.callbacks;
+ t->cb_user_data = sr.user_data;
+ grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
+ t->calling_back = 0;
+ gpr_cv_broadcast(&t->cv);
+ unlock(t);
+ unref_transport(t);
+}
+
+static void destroy_transport(grpc_transport *gt) {
+ transport *t = (transport *)gt;
+
+ gpr_mu_lock(&t->mu);
+ while (t->calling_back) {
+ gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
+ }
+ t->cb = NULL;
+ gpr_mu_unlock(&t->mu);
+
+ unref_transport(t);
+}
+
+static void close_transport(grpc_transport *gt) {
+ transport *t = (transport *)gt;
+ gpr_mu_lock(&t->mu);
+ if (t->ep) {
+ grpc_endpoint_shutdown(t->ep);
+ }
+ gpr_mu_unlock(&t->mu);
+}
+
+static int init_stream(grpc_transport *gt, grpc_stream *gs,
+ const void *server_data) {
+ transport *t = (transport *)gt;
+ stream *s = (stream *)gs;
+
+ ref_transport(t);
+
+ if (!server_data) {
+ lock(t);
+ s->id = 0;
+ } else {
+ s->id = (gpr_uint32)(gpr_uintptr)server_data;
+ t->incoming_stream = s;
+ grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ }
+
+ s->outgoing_window = DEFAULT_WINDOW;
+ s->incoming_window = DEFAULT_WINDOW;
+ s->write_closed = 0;
+ s->read_closed = 0;
+ s->cancelled = 0;
+ s->allow_window_updates = 0;
+ s->published_close = 0;
+ memset(&s->links, 0, sizeof(s->links));
+ memset(&s->included, 0, sizeof(s->included));
+ grpc_sopb_init(&s->outgoing_sopb);
+ grpc_chttp2_data_parser_init(&s->parser);
+ grpc_sopb_init(&s->callback_sopb);
+
+ if (!server_data) {
+ unlock(t);
+ }
+
+ return 0;
+}
+
+static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
+ transport *t = (transport *)gt;
+ stream *s = (stream *)gs;
+ size_t i;
+
+ gpr_mu_lock(&t->mu);
+
+ /* await pending callbacks
+ TODO(ctiller): this could be optimized to check if this stream is getting
+ callbacks */
+ while (t->calling_back) {
+ gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
+ }
+
+ /* stop parsing if we're currently parsing this stream */
+ if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
+ s->id != 0) {
+ become_skip_parser(t);
+ }
+
+ for (i = 0; i < STREAM_LIST_COUNT; i++) {
+ stream_list_remove(t, s, i);
+ }
+ remove_from_stream_map(t, s);
+
+ gpr_cv_broadcast(&t->cv);
+ gpr_mu_unlock(&t->mu);
+
+ grpc_sopb_destroy(&s->outgoing_sopb);
+ grpc_chttp2_data_parser_destroy(&s->parser);
+ grpc_sopb_destroy(&s->callback_sopb);
+
+ unref_transport(t);
+}
+
+/*
+ * LIST MANAGEMENT
+ */
+
+static stream *stream_list_remove_head(transport *t, stream_list_id id) {
+ stream *s = t->lists[id].head;
+ if (s) {
+ stream *new_head = s->links[id].next;
+ GPR_ASSERT(s->included[id]);
+ if (new_head) {
+ t->lists[id].head = new_head;
+ new_head->links[id].prev = NULL;
+ } else {
+ t->lists[id].head = NULL;
+ t->lists[id].tail = NULL;
+ }
+ s->included[id] = 0;
+ }
+ return s;
+}
+
+static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
+ if (!s->included[id]) return;
+ s->included[id] = 0;
+ if (s->links[id].prev) {
+ s->links[id].prev->links[id].next = s->links[id].next;
+ } else {
+ GPR_ASSERT(t->lists[id].head == s);
+ t->lists[id].head = s->links[id].next;
+ }
+ if (s->links[id].next) {
+ s->links[id].next->links[id].prev = s->links[id].prev;
+ } else {
+ t->lists[id].tail = s->links[id].prev;
+ }
+}
+
+static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
+ stream *old_tail;
+ GPR_ASSERT(!s->included[id]);
+ old_tail = t->lists[id].tail;
+ s->links[id].next = NULL;
+ s->links[id].prev = old_tail;
+ if (old_tail) {
+ old_tail->links[id].next = s;
+ } else {
+ s->links[id].prev = NULL;
+ t->lists[id].head = s;
+ }
+ t->lists[id].tail = s;
+ s->included[id] = 1;
+}
+
+static void stream_list_join(transport *t, stream *s, stream_list_id id) {
+ if (s->included[id]) {
+ return;
+ }
+ stream_list_add_tail(t, s, id);
+}
+
+static void remove_from_stream_map(transport *t, stream *s) {
+ if (s->id == 0) return;
+ if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
+ maybe_start_some_streams(t);
+ }
+}
+
+/*
+ * LOCK MANAGEMENT
+ */
+
+/* We take a transport-global lock in response to calls coming in from above,
+ and in response to data being received from below. New data to be written
+ is always queued, as are callbacks to process data. During unlock() we
+ check our todo lists and initiate callbacks and flush writes. */
+
+static void lock(transport *t) { gpr_mu_lock(&t->mu); }
+
+static void unlock(transport *t) {
+ int start_write = 0;
+ int perform_callbacks = 0;
+ int call_closed = 0;
+ grpc_endpoint *ep = t->ep;
+
+ /* see if we need to trigger a write - and if so, get the data ready */
+ if (ep && !t->writing) {
+ t->writing = start_write = prepare_write(t);
+ if (start_write) {
+ ref_transport(t);
+ }
+ }
+
+ /* gather any callbacks that need to be made */
+ if (!t->calling_back && t->cb) {
+ perform_callbacks = prepare_callbacks(t);
+ if (perform_callbacks) {
+ t->calling_back = 1;
+ }
+ if (t->error_state == ERROR_STATE_SEEN) {
+ call_closed = 1;
+ t->calling_back = 1;
+ t->error_state = ERROR_STATE_NOTIFIED;
+ }
+ }
+
+ if (perform_callbacks || call_closed) {
+ ref_transport(t);
+ }
+
+ /* finally unlock */
+ gpr_mu_unlock(&t->mu);
+
+ /* perform some callbacks if necessary */
+ if (perform_callbacks) {
+ run_callbacks(t);
+ }
+
+ if (call_closed) {
+ t->cb->closed(t->cb_user_data, &t->base);
+ }
+
+ /* write some bytes if necessary */
+ while (start_write) {
+ switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
+ finish_write, t, gpr_inf_future)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
+ /* grab the lock directly without wrappers since we just want to
+ continue writes if we loop: no need to check read callbacks again */
+ gpr_mu_lock(&t->mu);
+ t->outbuf.count = 0;
+ t->outbuf.length = 0;
+ t->writing = start_write = prepare_write(t);
+ if (!start_write) {
+ if (!t->reading) {
+ grpc_endpoint_destroy(t->ep);
+ t->ep = NULL;
+ gpr_cv_broadcast(&t->cv);
+ /* endpoint ref: safe because we'll still have the ref for write */
+ unref_transport(t);
+ }
+ }
+ gpr_mu_unlock(&t->mu);
+ if (!start_write) {
+ unref_transport(t);
+ }
+ break;
+ case GRPC_ENDPOINT_WRITE_ERROR:
+ start_write = 0;
+ /* use the wrapper lock/unlock here as we drop_connection, causing
+ read callbacks to be queued (which will be cleared during unlock) */
+ lock(t);
+ t->outbuf.count = 0;
+ t->outbuf.length = 0;
+ t->writing = 0;
+ drop_connection(t);
+ if (!t->reading) {
+ grpc_endpoint_destroy(t->ep);
+ t->ep = NULL;
+ gpr_cv_broadcast(&t->cv);
+ /* endpoint ref: safe because we'll still have the ref for write */
+ unref_transport(t);
+ }
+ unlock(t);
+ unref_transport(t);
+ break;
+ case GRPC_ENDPOINT_WRITE_PENDING:
+ start_write = 0;
+ break;
+ }
+ }
+
+ if (perform_callbacks || call_closed) {
+ lock(t);
+ t->calling_back = 0;
+ gpr_cv_broadcast(&t->cv);
+ unlock(t);
+ unref_transport(t);
+ }
+}
+
+/*
+ * OUTPUT PROCESSING
+ */
+
+static void push_setting(transport *t, grpc_chttp2_setting_id id,
+ gpr_uint32 value) {
+ const grpc_chttp2_setting_parameters *sp =
+ &grpc_chttp2_settings_parameters[id];
+ gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
+ if (use_value != value) {
+ gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
+ value, use_value);
+ }
+ if (use_value != t->settings[LOCAL_SETTINGS][id]) {
+ t->settings[LOCAL_SETTINGS][id] = use_value;
+ t->dirtied_local_settings = 1;
+ }
+}
+
+static void finish_write(void *tp, grpc_endpoint_cb_status error) {
+ transport *t = tp;
+
+ lock(t);
+ if (error != GRPC_ENDPOINT_CB_OK) {
+ drop_connection(t);
+ }
+ t->outbuf.count = 0;
+ t->outbuf.length = 0;
+ /* leave the writing flag up on shutdown to prevent further writes in unlock()
+ from starting */
+ t->writing = 0;
+ if (!t->reading) {
+ grpc_endpoint_destroy(t->ep);
+ t->ep = NULL;
+ gpr_cv_broadcast(&t->cv);
+ unref_transport(t); /* safe because we'll still have the ref for write */
+ }
+ unlock(t);
+
+ unref_transport(t);
+}
+
+static int prepare_write(transport *t) {
+ stream *s;
+ gpr_slice_buffer tempbuf;
+
+ /* simple writes are queued to qbuf, and flushed here */
+ tempbuf = t->qbuf;
+ t->qbuf = t->outbuf;
+ t->outbuf = tempbuf;
+ GPR_ASSERT(t->qbuf.count == 0);
+
+ if (t->dirtied_local_settings && !t->sent_local_settings) {
+ gpr_slice_buffer_add(
+ &t->outbuf, grpc_chttp2_settings_create(t->settings[SENT_SETTINGS],
+ t->settings[LOCAL_SETTINGS],
+ GRPC_CHTTP2_NUM_SETTINGS));
+ t->dirtied_local_settings = 0;
+ t->sent_local_settings = 1;
+ }
+
+ /* for each stream that's become writable, frame it's data (according to
+ available window sizes) and add to the output buffer */
+ while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE))) {
+ gpr_uint32 written = grpc_chttp2_encode_some(
+ s->outgoing_sopb.ops, &s->outgoing_sopb.nops, s->write_closed,
+ &t->outbuf, GPR_MIN(t->outgoing_window, s->outgoing_window), s->id,
+ &t->hpack_compressor);
+ t->outgoing_window -= written;
+ s->outgoing_window -= written;
+
+ /* if there are no more writes to do and writes are closed, we need to
+ queue a callback to let the application know */
+ if (s->write_closed && s->outgoing_sopb.nops == 0) {
+ stream_list_join(t, s, PENDING_CALLBACKS);
+ }
+
+ /* if there are still writes to do and the stream still has window
+ available, then schedule a further write */
+ if (s->outgoing_sopb.nops && s->outgoing_window) {
+ GPR_ASSERT(!t->outgoing_window);
+ stream_list_add_tail(t, s, WRITABLE);
+ }
+ }
+
+ /* for each stream that wants to update its window, add that window here */
+ while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
+ gpr_uint32 window_add = DEFAULT_WINDOW - s->incoming_window;
+ if (!s->read_closed && window_add) {
+ gpr_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_window_update_create(s->id, window_add));
+ s->incoming_window += window_add;
+ }
+ }
+
+ /* if the transport is ready to send a window update, do so here also */
+ if (t->incoming_window < DEFAULT_WINDOW / 2) {
+ gpr_uint32 window_add = DEFAULT_WINDOW - t->incoming_window;
+ gpr_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_window_update_create(0, window_add));
+ t->incoming_window += window_add;
+ }
+
+ return t->outbuf.length > 0;
+}
+
+static void maybe_start_some_streams(transport *t) {
+ while (
+ grpc_chttp2_stream_map_size(&t->stream_map) <
+ t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
+ stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
+ if (!s) break;
+
+ GPR_ASSERT(s->id == 0);
+ s->id = t->next_stream_id;
+ t->next_stream_id += 2;
+ grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ stream_list_join(t, s, WRITABLE);
+ }
+}
+
+static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
+ size_t ops_count, int is_last) {
+ transport *t = (transport *)gt;
+ stream *s = (stream *)gs;
+
+ lock(t);
+
+ if (is_last) {
+ s->write_closed = 1;
+ }
+ if (!s->cancelled) {
+ grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
+ if (is_last && s->outgoing_sopb.nops == 0) {
+ if (s->id != 0) {
+ gpr_slice_buffer_add(&t->qbuf,
+ grpc_chttp2_data_frame_create_empty_close(s->id));
+ }
+ } else if (s->id == 0) {
+ stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
+ maybe_start_some_streams(t);
+ } else if (s->outgoing_window) {
+ stream_list_join(t, s, WRITABLE);
+ }
+ } else {
+ grpc_stream_ops_unref_owned_objects(ops, ops_count);
+ }
+ if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) {
+ stream_list_join(t, s, PENDING_CALLBACKS);
+ }
+
+ unlock(t);
+}
+
+static void abort_stream(grpc_transport *gt, grpc_stream *gs,
+ grpc_status_code status) {
+ transport *t = (transport *)gt;
+ stream *s = (stream *)gs;
+
+ lock(t);
+ cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
+ 1);
+ unlock(t);
+}
+
+static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
+ void *user_data) {
+ transport *t = (transport *)gt;
+ outstanding_ping *p;
+
+ lock(t);
+ if (t->ping_capacity == t->ping_count) {
+ t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
+ t->pings =
+ gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity);
+ }
+ p = &t->pings[t->ping_count++];
+ p->id[0] = t->ping_counter >> 56;
+ p->id[1] = t->ping_counter >> 48;
+ p->id[2] = t->ping_counter >> 40;
+ p->id[3] = t->ping_counter >> 32;
+ p->id[4] = t->ping_counter >> 24;
+ p->id[5] = t->ping_counter >> 16;
+ p->id[6] = t->ping_counter >> 8;
+ p->id[7] = t->ping_counter;
+ p->cb = cb;
+ p->user_data = user_data;
+ gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
+ unlock(t);
+}
+
+/*
+ * INPUT PROCESSING
+ */
+
+static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
+ grpc_status_code local_status,
+ grpc_chttp2_error_code error_code,
+ int send_rst) {
+ char buffer[32];
+ int had_outgoing;
+
+ if (s) {
+ /* clear out any unreported input & output: nobody cares anymore */
+ grpc_sopb_reset(&s->parser.incoming_sopb);
+ had_outgoing = s->outgoing_sopb.nops != 0;
+ grpc_sopb_reset(&s->outgoing_sopb);
+ if (s->cancelled) {
+ send_rst = 0;
+ } else if (!s->read_closed || !s->write_closed || had_outgoing) {
+ s->cancelled = 1;
+ s->read_closed = 1;
+ s->write_closed = 1;
+
+ sprintf(buffer, "%d", local_status);
+ grpc_sopb_add_metadata(
+ &s->parser.incoming_sopb,
+ grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
+
+ stream_list_join(t, s, PENDING_CALLBACKS);
+ }
+ }
+ if (!id) send_rst = 0;
+ if (send_rst) {
+ gpr_slice_buffer_add(&t->qbuf,
+ grpc_chttp2_rst_stream_create(id, error_code));
+ }
+}
+
+static void cancel_stream_id(transport *t, gpr_uint32 id,
+ grpc_status_code local_status,
+ grpc_chttp2_error_code error_code, int send_rst) {
+ cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
+ send_rst);
+}
+
+static void cancel_stream(transport *t, stream *s,
+ grpc_status_code local_status,
+ grpc_chttp2_error_code error_code, int send_rst) {
+ cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
+}
+
+static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
+ cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
+ GRPC_CHTTP2_INTERNAL_ERROR, 0);
+}
+
+static void end_all_the_calls(transport *t) {
+ grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
+}
+
+static void drop_connection(transport *t) {
+ if (t->error_state == ERROR_STATE_NONE) {
+ t->error_state = ERROR_STATE_SEEN;
+ }
+ end_all_the_calls(t);
+}
+
+static void maybe_join_window_updates(transport *t, stream *s) {
+ if (s->allow_window_updates && s->incoming_window < DEFAULT_WINDOW / 2) {
+ stream_list_join(t, s, WINDOW_UPDATE);
+ }
+}
+
+static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
+ int allow) {
+ transport *t = (transport *)tp;
+ stream *s = (stream *)sp;
+
+ lock(t);
+ s->allow_window_updates = allow;
+ if (allow) {
+ maybe_join_window_updates(t, s);
+ } else {
+ stream_list_remove(t, s, WINDOW_UPDATE);
+ }
+ unlock(t);
+}
+
+static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
+ if (t->incoming_frame_size > t->incoming_window) {
+ gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
+ t->incoming_frame_size, t->incoming_window);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+
+ if (t->incoming_frame_size > s->incoming_window) {
+ gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
+ t->incoming_frame_size, s->incoming_window);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+
+ t->incoming_window -= t->incoming_frame_size;
+ s->incoming_window -= t->incoming_frame_size;
+
+ /* if the stream incoming window is getting low, schedule an update */
+ maybe_join_window_updates(t, s);
+
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+static stream *lookup_stream(transport *t, gpr_uint32 id) {
+ return grpc_chttp2_stream_map_find(&t->stream_map, id);
+}
+
+static grpc_chttp2_parse_error skip_parser(void *parser,
+ grpc_chttp2_parse_state *st,
+ gpr_slice slice, int is_last) {
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
+
+static int init_skip_frame(transport *t, int is_header) {
+ if (is_header) {
+ int is_eoh = t->expect_continuation_stream_id != 0;
+ t->parser = grpc_chttp2_header_parser_parse;
+ t->parser_data = &t->hpack_parser;
+ t->hpack_parser.on_header = skip_header;
+ t->hpack_parser.on_header_user_data = NULL;
+ t->hpack_parser.is_boundary = is_eoh;
+ t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
+ } else {
+ t->parser = skip_parser;
+ }
+ return 1;
+}
+
+static void become_skip_parser(transport *t) {
+ init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
+}
+
+static int init_data_frame_parser(transport *t) {
+ stream *s = lookup_stream(t, t->incoming_stream_id);
+ grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
+ if (!s || s->read_closed) return init_skip_frame(t, 0);
+ if (err == GRPC_CHTTP2_PARSE_OK) {
+ err = update_incoming_window(t, s);
+ }
+ if (err == GRPC_CHTTP2_PARSE_OK) {
+ err = grpc_chttp2_data_parser_begin_frame(&s->parser,
+ t->incoming_frame_flags);
+ }
+ switch (err) {
+ case GRPC_CHTTP2_PARSE_OK:
+ t->incoming_stream = s;
+ t->parser = grpc_chttp2_data_parser_parse;
+ t->parser_data = &s->parser;
+ return 1;
+ case GRPC_CHTTP2_STREAM_ERROR:
+ cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
+ GRPC_CHTTP2_INTERNAL_ERROR),
+ GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ return init_skip_frame(t, 0);
+ case GRPC_CHTTP2_CONNECTION_ERROR:
+ drop_connection(t);
+ return 0;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ return 0;
+}
+
+static void free_timeout(void *p) { gpr_free(p); }
+
+static void on_header(void *tp, grpc_mdelem *md) {
+ transport *t = tp;
+ stream *s = t->incoming_stream;
+
+ GPR_ASSERT(s);
+ stream_list_join(t, s, PENDING_CALLBACKS);
+ if (md->key == t->str_grpc_timeout) {
+ gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
+ if (!cached_timeout) {
+ /* not already parsed: parse it now, and store the result away */
+ cached_timeout = gpr_malloc(sizeof(gpr_timespec));
+ if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
+ cached_timeout)) {
+ gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
+ grpc_mdstr_as_c_string(md->value));
+ *cached_timeout = gpr_inf_future;
+ }
+ grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
+ }
+ grpc_sopb_add_deadline(&s->parser.incoming_sopb,
+ gpr_time_add(gpr_now(), *cached_timeout));
+ grpc_mdelem_unref(md);
+ } else {
+ grpc_sopb_add_metadata(&s->parser.incoming_sopb, md);
+ }
+}
+
+static int init_header_frame_parser(transport *t, int is_continuation) {
+ int is_eoh =
+ (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
+ stream *s;
+
+ if (is_eoh) {
+ t->expect_continuation_stream_id = 0;
+ } else {
+ t->expect_continuation_stream_id = t->incoming_stream_id;
+ }
+
+ if (!is_continuation) {
+ t->header_eof =
+ (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
+ }
+
+ /* could be a new stream or an existing stream */
+ s = lookup_stream(t, t->incoming_stream_id);
+ if (!s) {
+ if (is_continuation) {
+ gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received");
+ return init_skip_frame(t, 1);
+ }
+ if (t->is_client) {
+ if ((t->incoming_stream_id & 1) &&
+ t->incoming_stream_id < t->next_stream_id) {
+ /* this is an old (probably cancelled) stream */
+ } else {
+ gpr_log(GPR_ERROR, "ignoring new stream creation on client");
+ }
+ return init_skip_frame(t, 1);
+ }
+ t->incoming_stream = NULL;
+ /* if stream is accepted, we set incoming_stream in init_stream */
+ t->cb->accept_stream(t->cb_user_data, &t->base,
+ (void *)(gpr_uintptr)t->incoming_stream_id);
+ s = t->incoming_stream;
+ if (!s) {
+ gpr_log(GPR_ERROR, "stream not accepted");
+ return init_skip_frame(t, 1);
+ }
+ } else {
+ t->incoming_stream = s;
+ }
+ if (t->incoming_stream->read_closed) {
+ gpr_log(GPR_ERROR, "skipping already closed stream header");
+ t->incoming_stream = NULL;
+ return init_skip_frame(t, 1);
+ }
+ t->parser = grpc_chttp2_header_parser_parse;
+ t->parser_data = &t->hpack_parser;
+ t->hpack_parser.on_header = on_header;
+ t->hpack_parser.on_header_user_data = t;
+ t->hpack_parser.is_boundary = is_eoh;
+ t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
+ if (!is_continuation &&
+ (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
+ grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser);
+ }
+ return 1;
+}
+
+static int init_window_update_frame_parser(transport *t) {
+ int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
+ &t->simple_parsers.window_update,
+ t->incoming_frame_size,
+ t->incoming_frame_flags);
+ if (!ok) {
+ drop_connection(t);
+ }
+ t->parser = grpc_chttp2_window_update_parser_parse;
+ t->parser_data = &t->simple_parsers.window_update;
+ return ok;
+}
+
+static int init_ping_parser(transport *t) {
+ int ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping,
+ t->incoming_frame_size,
+ t->incoming_frame_flags);
+ if (!ok) {
+ drop_connection(t);
+ }
+ t->parser = grpc_chttp2_ping_parser_parse;
+ t->parser_data = &t->simple_parsers.ping;
+ return ok;
+}
+
+static int init_settings_frame_parser(transport *t) {
+ int ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_settings_parser_begin_frame(
+ &t->simple_parsers.settings, t->incoming_frame_size,
+ t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
+ if (!ok) {
+ drop_connection(t);
+ }
+ if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
+ memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS],
+ GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
+ }
+ t->parser = grpc_chttp2_settings_parser_parse;
+ t->parser_data = &t->simple_parsers.settings;
+ return ok;
+}
+
+static int init_frame_parser(transport *t) {
+ if (t->expect_continuation_stream_id != 0) {
+ if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
+ gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
+ t->incoming_frame_type);
+ return 0;
+ }
+ if (t->expect_continuation_stream_id != t->incoming_stream_id) {
+ gpr_log(GPR_ERROR,
+ "Expected CONTINUATION frame for stream %08x, got stream %08x",
+ t->expect_continuation_stream_id, t->incoming_stream_id);
+ return 0;
+ }
+ return init_header_frame_parser(t, 1);
+ }
+ switch (t->incoming_frame_type) {
+ case GRPC_CHTTP2_FRAME_DATA:
+ return init_data_frame_parser(t);
+ case GRPC_CHTTP2_FRAME_HEADER:
+ return init_header_frame_parser(t, 0);
+ case GRPC_CHTTP2_FRAME_CONTINUATION:
+ gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
+ return 0;
+ case GRPC_CHTTP2_FRAME_RST_STREAM:
+ /* TODO(ctiller): actually parse the reason */
+ cancel_stream_id(
+ t, t->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
+ GRPC_CHTTP2_CANCEL, 0);
+ return init_skip_frame(t, 0);
+ case GRPC_CHTTP2_FRAME_SETTINGS:
+ return init_settings_frame_parser(t);
+ case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
+ return init_window_update_frame_parser(t);
+ case GRPC_CHTTP2_FRAME_PING:
+ return init_ping_parser(t);
+ default:
+ gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
+ return init_skip_frame(t, 0);
+ }
+}
+
+static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) {
+ return window_update < MAX_WINDOW - window;
+}
+
+static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
+ grpc_chttp2_parse_state st;
+ size_t i;
+ memset(&st, 0, sizeof(st));
+ switch (t->parser(t->parser_data, &st, slice, is_last)) {
+ case GRPC_CHTTP2_PARSE_OK:
+ if (st.end_of_stream) {
+ t->incoming_stream->read_closed = 1;
+ stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ }
+ if (st.need_flush_reads) {
+ stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ }
+ if (st.metadata_boundary) {
+ grpc_sopb_add_metadata_boundary(
+ &t->incoming_stream->parser.incoming_sopb);
+ stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ }
+ if (st.ack_settings) {
+ gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
+ maybe_start_some_streams(t);
+ }
+ if (st.send_ping_ack) {
+ gpr_slice_buffer_add(
+ &t->qbuf,
+ grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
+ }
+ if (st.process_ping_reply) {
+ for (i = 0; i < t->ping_count; i++) {
+ if (0 ==
+ memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) {
+ t->pings[i].cb(t->pings[i].user_data);
+ memmove(&t->pings[i], &t->pings[i + 1],
+ (t->ping_count - i - 1) * sizeof(outstanding_ping));
+ t->ping_count--;
+ break;
+ }
+ }
+ }
+ if (st.window_update) {
+ if (t->incoming_stream_id) {
+ /* if there was a stream id, this is for some stream */
+ stream *s = lookup_stream(t, t->incoming_stream_id);
+ if (s) {
+ int was_window_empty = s->outgoing_window == 0;
+ if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
+ cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
+ GRPC_CHTTP2_FLOW_CONTROL_ERROR),
+ GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
+ } else {
+ s->outgoing_window += st.window_update;
+ /* if this window update makes outgoing ops writable again,
+ flag that */
+ if (was_window_empty && s->outgoing_sopb.nops) {
+ stream_list_join(t, s, WRITABLE);
+ }
+ }
+ }
+ } else {
+ /* transport level window update */
+ if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
+ drop_connection(t);
+ } else {
+ t->outgoing_window += st.window_update;
+ }
+ }
+ }
+ return 1;
+ case GRPC_CHTTP2_STREAM_ERROR:
+ become_skip_parser(t);
+ cancel_stream_id(
+ t, t->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
+ GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ return 1;
+ case GRPC_CHTTP2_CONNECTION_ERROR:
+ drop_connection(t);
+ return 0;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ return 0;
+}
+
+static int process_read(transport *t, gpr_slice slice) {
+ gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
+ gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
+ gpr_uint8 *cur = beg;
+
+ if (cur == end) return 1;
+
+ switch (t->deframe_state) {
+ case DTS_CLIENT_PREFIX_0:
+ case DTS_CLIENT_PREFIX_1:
+ case DTS_CLIENT_PREFIX_2:
+ case DTS_CLIENT_PREFIX_3:
+ case DTS_CLIENT_PREFIX_4:
+ case DTS_CLIENT_PREFIX_5:
+ case DTS_CLIENT_PREFIX_6:
+ case DTS_CLIENT_PREFIX_7:
+ case DTS_CLIENT_PREFIX_8:
+ case DTS_CLIENT_PREFIX_9:
+ case DTS_CLIENT_PREFIX_10:
+ case DTS_CLIENT_PREFIX_11:
+ case DTS_CLIENT_PREFIX_12:
+ case DTS_CLIENT_PREFIX_13:
+ case DTS_CLIENT_PREFIX_14:
+ case DTS_CLIENT_PREFIX_15:
+ case DTS_CLIENT_PREFIX_16:
+ case DTS_CLIENT_PREFIX_17:
+ case DTS_CLIENT_PREFIX_18:
+ case DTS_CLIENT_PREFIX_19:
+ case DTS_CLIENT_PREFIX_20:
+ case DTS_CLIENT_PREFIX_21:
+ case DTS_CLIENT_PREFIX_22:
+ case DTS_CLIENT_PREFIX_23:
+ while (cur != end && t->deframe_state != DTS_FH_0) {
+ if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) {
+ gpr_log(GPR_ERROR,
+ "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
+ "at byte %d",
+ CLIENT_CONNECT_STRING[t->deframe_state],
+ (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur,
+ (int)*cur, t->deframe_state);
+ return 0;
+ }
+ ++cur;
+ ++t->deframe_state;
+ }
+ if (cur == end) {
+ return 1;
+ }
+ /* fallthrough */
+ dts_fh_0:
+ case DTS_FH_0:
+ GPR_ASSERT(cur < end);
+ t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_1;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_1:
+ GPR_ASSERT(cur < end);
+ t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_2;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_2:
+ GPR_ASSERT(cur < end);
+ t->incoming_frame_size |= *cur;
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_3;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_3:
+ GPR_ASSERT(cur < end);
+ t->incoming_frame_type = *cur;
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_4;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_4:
+ GPR_ASSERT(cur < end);
+ t->incoming_frame_flags = *cur;
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_5;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_5:
+ GPR_ASSERT(cur < end);
+ t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f;
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_6;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_6:
+ GPR_ASSERT(cur < end);
+ t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_7;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_7:
+ GPR_ASSERT(cur < end);
+ t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_8;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_8:
+ GPR_ASSERT(cur < end);
+ t->incoming_stream_id |= ((gpr_uint32)*cur);
+ t->deframe_state = DTS_FRAME;
+ if (!init_frame_parser(t)) {
+ return 0;
+ }
+ if (t->incoming_frame_size == 0) {
+ if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
+ return 0;
+ }
+ if (++cur == end) {
+ t->deframe_state = DTS_FH_0;
+ return 1;
+ }
+ goto dts_fh_0; /* loop */
+ }
+ if (++cur == end) {
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FRAME:
+ GPR_ASSERT(cur < end);
+ if (end - cur == t->incoming_frame_size) {
+ if (!parse_frame_slice(
+ t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
+ return 0;
+ }
+ t->deframe_state = DTS_FH_0;
+ return 1;
+ } else if (end - cur > t->incoming_frame_size) {
+ if (!parse_frame_slice(
+ t, gpr_slice_sub_no_ref(slice, cur - beg,
+ cur + t->incoming_frame_size - beg),
+ 1)) {
+ return 0;
+ }
+ cur += t->incoming_frame_size;
+ goto dts_fh_0; /* loop */
+ } else {
+ if (!parse_frame_slice(
+ t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
+ return 0;
+ }
+ t->incoming_frame_size -= (end - cur);
+ return 1;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ }
+
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
+/* tcp read callback */
+static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
+ transport *t = tp;
+ size_t i;
+ int keep_reading = 0;
+
+ switch (error) {
+ case GRPC_ENDPOINT_CB_SHUTDOWN:
+ case GRPC_ENDPOINT_CB_EOF:
+ case GRPC_ENDPOINT_CB_ERROR:
+ case GRPC_ENDPOINT_CB_TIMED_OUT:
+ lock(t);
+ drop_connection(t);
+ t->reading = 0;
+ if (!t->writing && t->ep) {
+ grpc_endpoint_destroy(t->ep);
+ t->ep = NULL;
+ gpr_cv_broadcast(&t->cv);
+ unref_transport(t); /* safe as we still have a ref for read */
+ }
+ unlock(t);
+ unref_transport(t);
+ break;
+ case GRPC_ENDPOINT_CB_OK:
+ lock(t);
+ for (i = 0; i < nslices && process_read(t, slices[i]); i++)
+ ;
+ unlock(t);
+ keep_reading = 1;
+ break;
+ }
+
+ for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
+
+ if (keep_reading) {
+ grpc_endpoint_notify_on_read(t->ep, recv_data, t, gpr_inf_future);
+ }
+}
+
+/*
+ * CALLBACK LOOP
+ */
+
+static grpc_stream_state compute_state(gpr_uint8 write_closed,
+ gpr_uint8 read_closed) {
+ if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
+ if (write_closed) return GRPC_STREAM_SEND_CLOSED;
+ if (read_closed) return GRPC_STREAM_RECV_CLOSED;
+ return GRPC_STREAM_OPEN;
+}
+
+static int prepare_callbacks(transport *t) {
+ stream *s;
+ grpc_stream_op_buffer temp_sopb;
+ int n = 0;
+ while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
+ int execute = 1;
+ temp_sopb = s->parser.incoming_sopb;
+ s->parser.incoming_sopb = s->callback_sopb;
+ s->callback_sopb = temp_sopb;
+
+ s->callback_state = compute_state(
+ s->write_closed && s->outgoing_sopb.nops == 0, s->read_closed);
+ if (s->callback_state == GRPC_STREAM_CLOSED) {
+ remove_from_stream_map(t, s);
+ if (s->published_close) {
+ execute = 0;
+ }
+ s->published_close = 1;
+ }
+
+ if (execute) {
+ stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
+ n = 1;
+ }
+ }
+ return n;
+}
+
+static void run_callbacks(transport *t) {
+ stream *s;
+ while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
+ size_t nops = s->callback_sopb.nops;
+ s->callback_sopb.nops = 0;
+ t->cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
+ s->callback_sopb.ops, nops, s->callback_state);
+ }
+}
+
+/*
+ * INTEGRATION GLUE
+ */
+
+static const grpc_transport_vtable vtable = {
+ sizeof(stream), init_stream, send_batch, set_allow_window_updates,
+ destroy_stream, abort_stream, close_transport, send_ping,
+ destroy_transport};
+
+void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
+ void *arg,
+ const grpc_channel_args *channel_args,
+ grpc_endpoint *ep, gpr_slice *slices,
+ size_t nslices, grpc_mdctx *mdctx,
+ int is_client) {
+ transport *t = gpr_malloc(sizeof(transport));
+ init_transport(t, setup, arg, channel_args, ep, mdctx, is_client);
+ ref_transport(t);
+ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
+}