diff options
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 1615 |
1 files changed, 1615 insertions, 0 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c new file mode 100644 index 0000000000..8a6b427559 --- /dev/null +++ b/src/core/transport/chttp2_transport.c @@ -0,0 +1,1615 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/transport/chttp2_transport.h" + +#include <math.h> +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string.h> +#include <grpc/support/useful.h> +#include "src/core/transport/transport_impl.h" +#include "src/core/transport/chttp2/http2_errors.h" +#include "src/core/transport/chttp2/hpack_parser.h" +#include "src/core/transport/chttp2/frame_data.h" +#include "src/core/transport/chttp2/frame_ping.h" +#include "src/core/transport/chttp2/frame_rst_stream.h" +#include "src/core/transport/chttp2/frame_settings.h" +#include "src/core/transport/chttp2/frame_window_update.h" +#include "src/core/transport/chttp2/status_conversion.h" +#include "src/core/transport/chttp2/stream_encoder.h" +#include "src/core/transport/chttp2/stream_map.h" +#include "src/core/transport/chttp2/timeout_encoding.h" + +#define DEFAULT_WINDOW 65536 +#define MAX_WINDOW 0x7fffffffu + +#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" +#define CLIENT_CONNECT_STRLEN 24 + +typedef struct transport transport; +typedef struct stream stream; + +/* streams are kept in various linked lists depending on what things need to + happen to them... this enum labels each list */ +typedef enum { + /* streams that have pending writes */ + WRITABLE = 0, + /* streams that want to send window updates */ + WINDOW_UPDATE, + /* streams that are waiting to start because there are too many concurrent + streams on the connection */ + WAITING_FOR_CONCURRENCY, + /* streams that want to callback the application */ + PENDING_CALLBACKS, + /* streams that *ARE* calling back to the application */ + EXECUTING_CALLBACKS, + STREAM_LIST_COUNT /* must be last */ +} stream_list_id; + +/* deframer state for the overall http2 stream of bytes */ +typedef enum { + /* prefix: one entry per http2 connection prefix byte */ + DTS_CLIENT_PREFIX_0 = 0, + DTS_CLIENT_PREFIX_1, + DTS_CLIENT_PREFIX_2, + DTS_CLIENT_PREFIX_3, + DTS_CLIENT_PREFIX_4, + DTS_CLIENT_PREFIX_5, + DTS_CLIENT_PREFIX_6, + DTS_CLIENT_PREFIX_7, + DTS_CLIENT_PREFIX_8, + DTS_CLIENT_PREFIX_9, + DTS_CLIENT_PREFIX_10, + DTS_CLIENT_PREFIX_11, + DTS_CLIENT_PREFIX_12, + DTS_CLIENT_PREFIX_13, + DTS_CLIENT_PREFIX_14, + DTS_CLIENT_PREFIX_15, + DTS_CLIENT_PREFIX_16, + DTS_CLIENT_PREFIX_17, + DTS_CLIENT_PREFIX_18, + DTS_CLIENT_PREFIX_19, + DTS_CLIENT_PREFIX_20, + DTS_CLIENT_PREFIX_21, + DTS_CLIENT_PREFIX_22, + DTS_CLIENT_PREFIX_23, + /* frame header byte 0... */ + /* must follow from the prefix states */ + DTS_FH_0, + DTS_FH_1, + DTS_FH_2, + DTS_FH_3, + DTS_FH_4, + DTS_FH_5, + DTS_FH_6, + DTS_FH_7, + /* ... frame header byte 8 */ + DTS_FH_8, + /* inside a http2 frame */ + DTS_FRAME +} deframe_transport_state; + +typedef struct { + stream *head; + stream *tail; +} stream_list; + +typedef struct { + stream *next; + stream *prev; +} stream_link; + +typedef enum { + ERROR_STATE_NONE, + ERROR_STATE_SEEN, + ERROR_STATE_NOTIFIED +} error_state; + +/* We keep several sets of connection wide parameters */ +typedef enum { + /* The settings our peer has asked for (and we have acked) */ + PEER_SETTINGS = 0, + /* The settings we'd like to have */ + LOCAL_SETTINGS, + /* The settings we've published to our peer */ + SENT_SETTINGS, + /* The settings the peer has acked */ + ACKED_SETTINGS, + NUM_SETTING_SETS +} setting_set; + +/* Outstanding ping request data */ +typedef struct { + gpr_uint8 id[8]; + void (*cb)(void *user_data); + void *user_data; +} outstanding_ping; + +struct transport { + grpc_transport base; /* must be first */ + const grpc_transport_callbacks *cb; + void *cb_user_data; + grpc_endpoint *ep; + grpc_mdctx *metadata_context; + gpr_refcount refs; + gpr_uint8 is_client; + + gpr_mu mu; + gpr_cv cv; + + /* basic state management - what are we doing at the moment? */ + gpr_uint8 reading; + gpr_uint8 writing; + gpr_uint8 calling_back; + error_state error_state; + + /* stream indexing */ + gpr_uint32 next_stream_id; + + /* settings */ + gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + gpr_uint8 sent_local_settings; + gpr_uint8 dirtied_local_settings; + + /* window management */ + gpr_uint32 outgoing_window; + gpr_uint32 incoming_window; + + /* deframing */ + deframe_transport_state deframe_state; + gpr_uint8 incoming_frame_type; + gpr_uint8 incoming_frame_flags; + gpr_uint8 header_eof; + gpr_uint32 expect_continuation_stream_id; + gpr_uint32 incoming_frame_size; + gpr_uint32 incoming_stream_id; + + /* hpack encoding */ + grpc_chttp2_hpack_compressor hpack_compressor; + + /* various parsers */ + grpc_chttp2_hpack_parser hpack_parser; + /* simple one shot parsers */ + union { + grpc_chttp2_window_update_parser window_update; + grpc_chttp2_settings_parser settings; + grpc_chttp2_ping_parser ping; + } simple_parsers; + + /* state for a stream that's not yet been created */ + grpc_stream_op_buffer new_stream_sopb; + + /* active parser */ + void *parser_data; + stream *incoming_stream; + grpc_chttp2_parse_error (*parser)(void *parser_user_data, + grpc_chttp2_parse_state *state, + gpr_slice slice, int is_last); + + gpr_slice_buffer outbuf; + gpr_slice_buffer qbuf; + + stream_list lists[STREAM_LIST_COUNT]; + grpc_chttp2_stream_map stream_map; + + /* metadata object cache */ + grpc_mdstr *str_grpc_timeout; + + /* pings */ + outstanding_ping *pings; + size_t ping_count; + size_t ping_capacity; + gpr_int64 ping_counter; +}; + +struct stream { + gpr_uint32 id; + + gpr_uint32 outgoing_window; + gpr_uint32 incoming_window; + gpr_uint8 write_closed; + gpr_uint8 read_closed; + gpr_uint8 cancelled; + gpr_uint8 allow_window_updates; + gpr_uint8 published_close; + + stream_link links[STREAM_LIST_COUNT]; + gpr_uint8 included[STREAM_LIST_COUNT]; + + grpc_stream_op_buffer outgoing_sopb; + + grpc_chttp2_data_parser parser; + + grpc_stream_state callback_state; + grpc_stream_op_buffer callback_sopb; +}; + +static const grpc_transport_vtable vtable; + +static void push_setting(transport *t, grpc_chttp2_setting_id id, + gpr_uint32 value); + +static int prepare_callbacks(transport *t); +static void run_callbacks(transport *t); + +static int prepare_write(transport *t); +static void finish_write(void *t, grpc_endpoint_cb_status status); + +static void lock(transport *t); +static void unlock(transport *t); + +static void drop_connection(transport *t); +static void end_all_the_calls(transport *t); + +static stream *stream_list_remove_head(transport *t, stream_list_id id); +static void stream_list_remove(transport *t, stream *s, stream_list_id id); +static void stream_list_add_tail(transport *t, stream *s, stream_list_id id); +static void stream_list_join(transport *t, stream *s, stream_list_id id); + +static void cancel_stream_id(transport *t, gpr_uint32 id, + grpc_status_code local_status, + grpc_chttp2_error_code error_code, int send_rst); +static void cancel_stream(transport *t, stream *s, + grpc_status_code local_status, + grpc_chttp2_error_code error_code, int send_rst); +static stream *lookup_stream(transport *t, gpr_uint32 id); +static void remove_from_stream_map(transport *t, stream *s); +static void maybe_start_some_streams(transport *t); + +static void become_skip_parser(transport *t); + +/* + * CONSTRUCTION/DESTRUCTION/REFCOUNTING + */ + +static void unref_transport(transport *t) { + size_t i; + + if (!gpr_unref(&t->refs)) return; + + gpr_mu_lock(&t->mu); + + GPR_ASSERT(t->ep == NULL); + + gpr_slice_buffer_destroy(&t->outbuf); + gpr_slice_buffer_destroy(&t->qbuf); + grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); + grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); + + grpc_mdstr_unref(t->str_grpc_timeout); + + for (i = 0; i < STREAM_LIST_COUNT; i++) { + GPR_ASSERT(t->lists[i].head == NULL); + GPR_ASSERT(t->lists[i].tail == NULL); + } + + GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0); + + grpc_chttp2_stream_map_destroy(&t->stream_map); + + gpr_mu_unlock(&t->mu); + gpr_mu_destroy(&t->mu); + + /* callback remaining pings: they're not allowed to call into the transpot, + and maybe they hold resources that need to be freed */ + for (i = 0; i < t->ping_count; i++) { + t->pings[i].cb(t->pings[i].user_data); + } + gpr_free(t->pings); + + gpr_free(t); +} + +static void ref_transport(transport *t) { gpr_ref(&t->refs); } + +static void init_transport(transport *t, grpc_transport_setup_callback setup, + void *arg, const grpc_channel_args *channel_args, + grpc_endpoint *ep, grpc_mdctx *mdctx, + int is_client) { + size_t i; + int j; + grpc_transport_setup_result sr; + + GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); + + t->base.vtable = &vtable; + t->ep = ep; + /* one ref is for destroy, the other for when ep becomes NULL */ + gpr_ref_init(&t->refs, 2); + gpr_mu_init(&t->mu); + gpr_cv_init(&t->cv); + t->metadata_context = mdctx; + t->str_grpc_timeout = + grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); + t->reading = 1; + t->writing = 0; + t->error_state = ERROR_STATE_NONE; + t->next_stream_id = is_client ? 1 : 2; + t->is_client = is_client; + t->outgoing_window = DEFAULT_WINDOW; + t->incoming_window = DEFAULT_WINDOW; + t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; + t->expect_continuation_stream_id = 0; + t->pings = NULL; + t->ping_count = 0; + t->ping_capacity = 0; + t->ping_counter = gpr_now().tv_nsec; + grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); + gpr_slice_buffer_init(&t->outbuf); + gpr_slice_buffer_init(&t->qbuf); + if (is_client) { + gpr_slice_buffer_add(&t->qbuf, + gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); + } + /* 8 is a random stab in the dark as to a good initial size: it's small enough + that it shouldn't waste memory for infrequently used connections, yet + large enough that the exponential growth should happen nicely when it's + needed. + TODO(ctiller): tune this */ + grpc_chttp2_stream_map_init(&t->stream_map, 8); + memset(&t->lists, 0, sizeof(t->lists)); + + /* copy in initial settings to all setting sets */ + for (i = 0; i < NUM_SETTING_SETS; i++) { + for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) { + t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value; + } + } + t->dirtied_local_settings = 1; + t->sent_local_settings = 0; + + /* configure http2 the way we like it */ + if (t->is_client) { + push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + } + + if (channel_args) { + for (i = 0; i < channel_args->num_args; i++) { + if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) { + if (t->is_client) { + gpr_log(GPR_ERROR, "%s: is ignored on the client", + GRPC_ARG_MAX_CONCURRENT_STREAMS); + } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s: must be an integer", + GRPC_ARG_MAX_CONCURRENT_STREAMS); + } else { + push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + channel_args->args[i].value.integer); + } + } + } + } + + gpr_mu_lock(&t->mu); + t->calling_back = 1; + ref_transport(t); + gpr_mu_unlock(&t->mu); + + sr = setup(arg, &t->base, t->metadata_context); + + lock(t); + t->cb = sr.callbacks; + t->cb_user_data = sr.user_data; + grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context); + t->calling_back = 0; + gpr_cv_broadcast(&t->cv); + unlock(t); + unref_transport(t); +} + +static void destroy_transport(grpc_transport *gt) { + transport *t = (transport *)gt; + + gpr_mu_lock(&t->mu); + while (t->calling_back) { + gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); + } + t->cb = NULL; + gpr_mu_unlock(&t->mu); + + unref_transport(t); +} + +static void close_transport(grpc_transport *gt) { + transport *t = (transport *)gt; + gpr_mu_lock(&t->mu); + if (t->ep) { + grpc_endpoint_shutdown(t->ep); + } + gpr_mu_unlock(&t->mu); +} + +static int init_stream(grpc_transport *gt, grpc_stream *gs, + const void *server_data) { + transport *t = (transport *)gt; + stream *s = (stream *)gs; + + ref_transport(t); + + if (!server_data) { + lock(t); + s->id = 0; + } else { + s->id = (gpr_uint32)(gpr_uintptr)server_data; + t->incoming_stream = s; + grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + } + + s->outgoing_window = DEFAULT_WINDOW; + s->incoming_window = DEFAULT_WINDOW; + s->write_closed = 0; + s->read_closed = 0; + s->cancelled = 0; + s->allow_window_updates = 0; + s->published_close = 0; + memset(&s->links, 0, sizeof(s->links)); + memset(&s->included, 0, sizeof(s->included)); + grpc_sopb_init(&s->outgoing_sopb); + grpc_chttp2_data_parser_init(&s->parser); + grpc_sopb_init(&s->callback_sopb); + + if (!server_data) { + unlock(t); + } + + return 0; +} + +static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { + transport *t = (transport *)gt; + stream *s = (stream *)gs; + size_t i; + + gpr_mu_lock(&t->mu); + + /* await pending callbacks + TODO(ctiller): this could be optimized to check if this stream is getting + callbacks */ + while (t->calling_back) { + gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); + } + + /* stop parsing if we're currently parsing this stream */ + if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id && + s->id != 0) { + become_skip_parser(t); + } + + for (i = 0; i < STREAM_LIST_COUNT; i++) { + stream_list_remove(t, s, i); + } + remove_from_stream_map(t, s); + + gpr_cv_broadcast(&t->cv); + gpr_mu_unlock(&t->mu); + + grpc_sopb_destroy(&s->outgoing_sopb); + grpc_chttp2_data_parser_destroy(&s->parser); + grpc_sopb_destroy(&s->callback_sopb); + + unref_transport(t); +} + +/* + * LIST MANAGEMENT + */ + +static stream *stream_list_remove_head(transport *t, stream_list_id id) { + stream *s = t->lists[id].head; + if (s) { + stream *new_head = s->links[id].next; + GPR_ASSERT(s->included[id]); + if (new_head) { + t->lists[id].head = new_head; + new_head->links[id].prev = NULL; + } else { + t->lists[id].head = NULL; + t->lists[id].tail = NULL; + } + s->included[id] = 0; + } + return s; +} + +static void stream_list_remove(transport *t, stream *s, stream_list_id id) { + if (!s->included[id]) return; + s->included[id] = 0; + if (s->links[id].prev) { + s->links[id].prev->links[id].next = s->links[id].next; + } else { + GPR_ASSERT(t->lists[id].head == s); + t->lists[id].head = s->links[id].next; + } + if (s->links[id].next) { + s->links[id].next->links[id].prev = s->links[id].prev; + } else { + t->lists[id].tail = s->links[id].prev; + } +} + +static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { + stream *old_tail; + GPR_ASSERT(!s->included[id]); + old_tail = t->lists[id].tail; + s->links[id].next = NULL; + s->links[id].prev = old_tail; + if (old_tail) { + old_tail->links[id].next = s; + } else { + s->links[id].prev = NULL; + t->lists[id].head = s; + } + t->lists[id].tail = s; + s->included[id] = 1; +} + +static void stream_list_join(transport *t, stream *s, stream_list_id id) { + if (s->included[id]) { + return; + } + stream_list_add_tail(t, s, id); +} + +static void remove_from_stream_map(transport *t, stream *s) { + if (s->id == 0) return; + if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) { + maybe_start_some_streams(t); + } +} + +/* + * LOCK MANAGEMENT + */ + +/* We take a transport-global lock in response to calls coming in from above, + and in response to data being received from below. New data to be written + is always queued, as are callbacks to process data. During unlock() we + check our todo lists and initiate callbacks and flush writes. */ + +static void lock(transport *t) { gpr_mu_lock(&t->mu); } + +static void unlock(transport *t) { + int start_write = 0; + int perform_callbacks = 0; + int call_closed = 0; + grpc_endpoint *ep = t->ep; + + /* see if we need to trigger a write - and if so, get the data ready */ + if (ep && !t->writing) { + t->writing = start_write = prepare_write(t); + if (start_write) { + ref_transport(t); + } + } + + /* gather any callbacks that need to be made */ + if (!t->calling_back && t->cb) { + perform_callbacks = prepare_callbacks(t); + if (perform_callbacks) { + t->calling_back = 1; + } + if (t->error_state == ERROR_STATE_SEEN) { + call_closed = 1; + t->calling_back = 1; + t->error_state = ERROR_STATE_NOTIFIED; + } + } + + if (perform_callbacks || call_closed) { + ref_transport(t); + } + + /* finally unlock */ + gpr_mu_unlock(&t->mu); + + /* perform some callbacks if necessary */ + if (perform_callbacks) { + run_callbacks(t); + } + + if (call_closed) { + t->cb->closed(t->cb_user_data, &t->base); + } + + /* write some bytes if necessary */ + while (start_write) { + switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count, + finish_write, t, gpr_inf_future)) { + case GRPC_ENDPOINT_WRITE_DONE: + /* grab the lock directly without wrappers since we just want to + continue writes if we loop: no need to check read callbacks again */ + gpr_mu_lock(&t->mu); + t->outbuf.count = 0; + t->outbuf.length = 0; + t->writing = start_write = prepare_write(t); + if (!start_write) { + if (!t->reading) { + grpc_endpoint_destroy(t->ep); + t->ep = NULL; + gpr_cv_broadcast(&t->cv); + /* endpoint ref: safe because we'll still have the ref for write */ + unref_transport(t); + } + } + gpr_mu_unlock(&t->mu); + if (!start_write) { + unref_transport(t); + } + break; + case GRPC_ENDPOINT_WRITE_ERROR: + start_write = 0; + /* use the wrapper lock/unlock here as we drop_connection, causing + read callbacks to be queued (which will be cleared during unlock) */ + lock(t); + t->outbuf.count = 0; + t->outbuf.length = 0; + t->writing = 0; + drop_connection(t); + if (!t->reading) { + grpc_endpoint_destroy(t->ep); + t->ep = NULL; + gpr_cv_broadcast(&t->cv); + /* endpoint ref: safe because we'll still have the ref for write */ + unref_transport(t); + } + unlock(t); + unref_transport(t); + break; + case GRPC_ENDPOINT_WRITE_PENDING: + start_write = 0; + break; + } + } + + if (perform_callbacks || call_closed) { + lock(t); + t->calling_back = 0; + gpr_cv_broadcast(&t->cv); + unlock(t); + unref_transport(t); + } +} + +/* + * OUTPUT PROCESSING + */ + +static void push_setting(transport *t, grpc_chttp2_setting_id id, + gpr_uint32 value) { + const grpc_chttp2_setting_parameters *sp = + &grpc_chttp2_settings_parameters[id]; + gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); + if (use_value != value) { + gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name, + value, use_value); + } + if (use_value != t->settings[LOCAL_SETTINGS][id]) { + t->settings[LOCAL_SETTINGS][id] = use_value; + t->dirtied_local_settings = 1; + } +} + +static void finish_write(void *tp, grpc_endpoint_cb_status error) { + transport *t = tp; + + lock(t); + if (error != GRPC_ENDPOINT_CB_OK) { + drop_connection(t); + } + t->outbuf.count = 0; + t->outbuf.length = 0; + /* leave the writing flag up on shutdown to prevent further writes in unlock() + from starting */ + t->writing = 0; + if (!t->reading) { + grpc_endpoint_destroy(t->ep); + t->ep = NULL; + gpr_cv_broadcast(&t->cv); + unref_transport(t); /* safe because we'll still have the ref for write */ + } + unlock(t); + + unref_transport(t); +} + +static int prepare_write(transport *t) { + stream *s; + gpr_slice_buffer tempbuf; + + /* simple writes are queued to qbuf, and flushed here */ + tempbuf = t->qbuf; + t->qbuf = t->outbuf; + t->outbuf = tempbuf; + GPR_ASSERT(t->qbuf.count == 0); + + if (t->dirtied_local_settings && !t->sent_local_settings) { + gpr_slice_buffer_add( + &t->outbuf, grpc_chttp2_settings_create(t->settings[SENT_SETTINGS], + t->settings[LOCAL_SETTINGS], + GRPC_CHTTP2_NUM_SETTINGS)); + t->dirtied_local_settings = 0; + t->sent_local_settings = 1; + } + + /* for each stream that's become writable, frame it's data (according to + available window sizes) and add to the output buffer */ + while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE))) { + gpr_uint32 written = grpc_chttp2_encode_some( + s->outgoing_sopb.ops, &s->outgoing_sopb.nops, s->write_closed, + &t->outbuf, GPR_MIN(t->outgoing_window, s->outgoing_window), s->id, + &t->hpack_compressor); + t->outgoing_window -= written; + s->outgoing_window -= written; + + /* if there are no more writes to do and writes are closed, we need to + queue a callback to let the application know */ + if (s->write_closed && s->outgoing_sopb.nops == 0) { + stream_list_join(t, s, PENDING_CALLBACKS); + } + + /* if there are still writes to do and the stream still has window + available, then schedule a further write */ + if (s->outgoing_sopb.nops && s->outgoing_window) { + GPR_ASSERT(!t->outgoing_window); + stream_list_add_tail(t, s, WRITABLE); + } + } + + /* for each stream that wants to update its window, add that window here */ + while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { + gpr_uint32 window_add = DEFAULT_WINDOW - s->incoming_window; + if (!s->read_closed && window_add) { + gpr_slice_buffer_add(&t->outbuf, + grpc_chttp2_window_update_create(s->id, window_add)); + s->incoming_window += window_add; + } + } + + /* if the transport is ready to send a window update, do so here also */ + if (t->incoming_window < DEFAULT_WINDOW / 2) { + gpr_uint32 window_add = DEFAULT_WINDOW - t->incoming_window; + gpr_slice_buffer_add(&t->outbuf, + grpc_chttp2_window_update_create(0, window_add)); + t->incoming_window += window_add; + } + + return t->outbuf.length > 0; +} + +static void maybe_start_some_streams(transport *t) { + while ( + grpc_chttp2_stream_map_size(&t->stream_map) < + t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { + stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); + if (!s) break; + + GPR_ASSERT(s->id == 0); + s->id = t->next_stream_id; + t->next_stream_id += 2; + grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + stream_list_join(t, s, WRITABLE); + } +} + +static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, + size_t ops_count, int is_last) { + transport *t = (transport *)gt; + stream *s = (stream *)gs; + + lock(t); + + if (is_last) { + s->write_closed = 1; + } + if (!s->cancelled) { + grpc_sopb_append(&s->outgoing_sopb, ops, ops_count); + if (is_last && s->outgoing_sopb.nops == 0) { + if (s->id != 0) { + gpr_slice_buffer_add(&t->qbuf, + grpc_chttp2_data_frame_create_empty_close(s->id)); + } + } else if (s->id == 0) { + stream_list_join(t, s, WAITING_FOR_CONCURRENCY); + maybe_start_some_streams(t); + } else if (s->outgoing_window) { + stream_list_join(t, s, WRITABLE); + } + } else { + grpc_stream_ops_unref_owned_objects(ops, ops_count); + } + if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) { + stream_list_join(t, s, PENDING_CALLBACKS); + } + + unlock(t); +} + +static void abort_stream(grpc_transport *gt, grpc_stream *gs, + grpc_status_code status) { + transport *t = (transport *)gt; + stream *s = (stream *)gs; + + lock(t); + cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status), + 1); + unlock(t); +} + +static void send_ping(grpc_transport *gt, void (*cb)(void *user_data), + void *user_data) { + transport *t = (transport *)gt; + outstanding_ping *p; + + lock(t); + if (t->ping_capacity == t->ping_count) { + t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2); + t->pings = + gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity); + } + p = &t->pings[t->ping_count++]; + p->id[0] = t->ping_counter >> 56; + p->id[1] = t->ping_counter >> 48; + p->id[2] = t->ping_counter >> 40; + p->id[3] = t->ping_counter >> 32; + p->id[4] = t->ping_counter >> 24; + p->id[5] = t->ping_counter >> 16; + p->id[6] = t->ping_counter >> 8; + p->id[7] = t->ping_counter; + p->cb = cb; + p->user_data = user_data; + gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); + unlock(t); +} + +/* + * INPUT PROCESSING + */ + +static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, + grpc_status_code local_status, + grpc_chttp2_error_code error_code, + int send_rst) { + char buffer[32]; + int had_outgoing; + + if (s) { + /* clear out any unreported input & output: nobody cares anymore */ + grpc_sopb_reset(&s->parser.incoming_sopb); + had_outgoing = s->outgoing_sopb.nops != 0; + grpc_sopb_reset(&s->outgoing_sopb); + if (s->cancelled) { + send_rst = 0; + } else if (!s->read_closed || !s->write_closed || had_outgoing) { + s->cancelled = 1; + s->read_closed = 1; + s->write_closed = 1; + + sprintf(buffer, "%d", local_status); + grpc_sopb_add_metadata( + &s->parser.incoming_sopb, + grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); + + stream_list_join(t, s, PENDING_CALLBACKS); + } + } + if (!id) send_rst = 0; + if (send_rst) { + gpr_slice_buffer_add(&t->qbuf, + grpc_chttp2_rst_stream_create(id, error_code)); + } +} + +static void cancel_stream_id(transport *t, gpr_uint32 id, + grpc_status_code local_status, + grpc_chttp2_error_code error_code, int send_rst) { + cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code, + send_rst); +} + +static void cancel_stream(transport *t, stream *s, + grpc_status_code local_status, + grpc_chttp2_error_code error_code, int send_rst) { + cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst); +} + +static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) { + cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE, + GRPC_CHTTP2_INTERNAL_ERROR, 0); +} + +static void end_all_the_calls(transport *t) { + grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t); +} + +static void drop_connection(transport *t) { + if (t->error_state == ERROR_STATE_NONE) { + t->error_state = ERROR_STATE_SEEN; + } + end_all_the_calls(t); +} + +static void maybe_join_window_updates(transport *t, stream *s) { + if (s->allow_window_updates && s->incoming_window < DEFAULT_WINDOW / 2) { + stream_list_join(t, s, WINDOW_UPDATE); + } +} + +static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp, + int allow) { + transport *t = (transport *)tp; + stream *s = (stream *)sp; + + lock(t); + s->allow_window_updates = allow; + if (allow) { + maybe_join_window_updates(t, s); + } else { + stream_list_remove(t, s, WINDOW_UPDATE); + } + unlock(t); +} + +static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { + if (t->incoming_frame_size > t->incoming_window) { + gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", + t->incoming_frame_size, t->incoming_window); + return GRPC_CHTTP2_CONNECTION_ERROR; + } + + if (t->incoming_frame_size > s->incoming_window) { + gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", + t->incoming_frame_size, s->incoming_window); + return GRPC_CHTTP2_CONNECTION_ERROR; + } + + t->incoming_window -= t->incoming_frame_size; + s->incoming_window -= t->incoming_frame_size; + + /* if the stream incoming window is getting low, schedule an update */ + maybe_join_window_updates(t, s); + + return GRPC_CHTTP2_PARSE_OK; +} + +static stream *lookup_stream(transport *t, gpr_uint32 id) { + return grpc_chttp2_stream_map_find(&t->stream_map, id); +} + +static grpc_chttp2_parse_error skip_parser(void *parser, + grpc_chttp2_parse_state *st, + gpr_slice slice, int is_last) { + return GRPC_CHTTP2_PARSE_OK; +} + +static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); } + +static int init_skip_frame(transport *t, int is_header) { + if (is_header) { + int is_eoh = t->expect_continuation_stream_id != 0; + t->parser = grpc_chttp2_header_parser_parse; + t->parser_data = &t->hpack_parser; + t->hpack_parser.on_header = skip_header; + t->hpack_parser.on_header_user_data = NULL; + t->hpack_parser.is_boundary = is_eoh; + t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0; + } else { + t->parser = skip_parser; + } + return 1; +} + +static void become_skip_parser(transport *t) { + init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse); +} + +static int init_data_frame_parser(transport *t) { + stream *s = lookup_stream(t, t->incoming_stream_id); + grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; + if (!s || s->read_closed) return init_skip_frame(t, 0); + if (err == GRPC_CHTTP2_PARSE_OK) { + err = update_incoming_window(t, s); + } + if (err == GRPC_CHTTP2_PARSE_OK) { + err = grpc_chttp2_data_parser_begin_frame(&s->parser, + t->incoming_frame_flags); + } + switch (err) { + case GRPC_CHTTP2_PARSE_OK: + t->incoming_stream = s; + t->parser = grpc_chttp2_data_parser_parse; + t->parser_data = &s->parser; + return 1; + case GRPC_CHTTP2_STREAM_ERROR: + cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( + GRPC_CHTTP2_INTERNAL_ERROR), + GRPC_CHTTP2_INTERNAL_ERROR, 1); + return init_skip_frame(t, 0); + case GRPC_CHTTP2_CONNECTION_ERROR: + drop_connection(t); + return 0; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return 0; +} + +static void free_timeout(void *p) { gpr_free(p); } + +static void on_header(void *tp, grpc_mdelem *md) { + transport *t = tp; + stream *s = t->incoming_stream; + + GPR_ASSERT(s); + stream_list_join(t, s, PENDING_CALLBACKS); + if (md->key == t->str_grpc_timeout) { + gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); + if (!cached_timeout) { + /* not already parsed: parse it now, and store the result away */ + cached_timeout = gpr_malloc(sizeof(gpr_timespec)); + if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value), + cached_timeout)) { + gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", + grpc_mdstr_as_c_string(md->value)); + *cached_timeout = gpr_inf_future; + } + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + } + grpc_sopb_add_deadline(&s->parser.incoming_sopb, + gpr_time_add(gpr_now(), *cached_timeout)); + grpc_mdelem_unref(md); + } else { + grpc_sopb_add_metadata(&s->parser.incoming_sopb, md); + } +} + +static int init_header_frame_parser(transport *t, int is_continuation) { + int is_eoh = + (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; + stream *s; + + if (is_eoh) { + t->expect_continuation_stream_id = 0; + } else { + t->expect_continuation_stream_id = t->incoming_stream_id; + } + + if (!is_continuation) { + t->header_eof = + (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; + } + + /* could be a new stream or an existing stream */ + s = lookup_stream(t, t->incoming_stream_id); + if (!s) { + if (is_continuation) { + gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received"); + return init_skip_frame(t, 1); + } + if (t->is_client) { + if ((t->incoming_stream_id & 1) && + t->incoming_stream_id < t->next_stream_id) { + /* this is an old (probably cancelled) stream */ + } else { + gpr_log(GPR_ERROR, "ignoring new stream creation on client"); + } + return init_skip_frame(t, 1); + } + t->incoming_stream = NULL; + /* if stream is accepted, we set incoming_stream in init_stream */ + t->cb->accept_stream(t->cb_user_data, &t->base, + (void *)(gpr_uintptr)t->incoming_stream_id); + s = t->incoming_stream; + if (!s) { + gpr_log(GPR_ERROR, "stream not accepted"); + return init_skip_frame(t, 1); + } + } else { + t->incoming_stream = s; + } + if (t->incoming_stream->read_closed) { + gpr_log(GPR_ERROR, "skipping already closed stream header"); + t->incoming_stream = NULL; + return init_skip_frame(t, 1); + } + t->parser = grpc_chttp2_header_parser_parse; + t->parser_data = &t->hpack_parser; + t->hpack_parser.on_header = on_header; + t->hpack_parser.on_header_user_data = t; + t->hpack_parser.is_boundary = is_eoh; + t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0; + if (!is_continuation && + (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { + grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser); + } + return 1; +} + +static int init_window_update_frame_parser(transport *t) { + int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame( + &t->simple_parsers.window_update, + t->incoming_frame_size, + t->incoming_frame_flags); + if (!ok) { + drop_connection(t); + } + t->parser = grpc_chttp2_window_update_parser_parse; + t->parser_data = &t->simple_parsers.window_update; + return ok; +} + +static int init_ping_parser(transport *t) { + int ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping, + t->incoming_frame_size, + t->incoming_frame_flags); + if (!ok) { + drop_connection(t); + } + t->parser = grpc_chttp2_ping_parser_parse; + t->parser_data = &t->simple_parsers.ping; + return ok; +} + +static int init_settings_frame_parser(transport *t) { + int ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_settings_parser_begin_frame( + &t->simple_parsers.settings, t->incoming_frame_size, + t->incoming_frame_flags, t->settings[PEER_SETTINGS]); + if (!ok) { + drop_connection(t); + } + if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { + memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS], + GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); + } + t->parser = grpc_chttp2_settings_parser_parse; + t->parser_data = &t->simple_parsers.settings; + return ok; +} + +static int init_frame_parser(transport *t) { + if (t->expect_continuation_stream_id != 0) { + if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) { + gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x", + t->incoming_frame_type); + return 0; + } + if (t->expect_continuation_stream_id != t->incoming_stream_id) { + gpr_log(GPR_ERROR, + "Expected CONTINUATION frame for stream %08x, got stream %08x", + t->expect_continuation_stream_id, t->incoming_stream_id); + return 0; + } + return init_header_frame_parser(t, 1); + } + switch (t->incoming_frame_type) { + case GRPC_CHTTP2_FRAME_DATA: + return init_data_frame_parser(t); + case GRPC_CHTTP2_FRAME_HEADER: + return init_header_frame_parser(t, 0); + case GRPC_CHTTP2_FRAME_CONTINUATION: + gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); + return 0; + case GRPC_CHTTP2_FRAME_RST_STREAM: + /* TODO(ctiller): actually parse the reason */ + cancel_stream_id( + t, t->incoming_stream_id, + grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL), + GRPC_CHTTP2_CANCEL, 0); + return init_skip_frame(t, 0); + case GRPC_CHTTP2_FRAME_SETTINGS: + return init_settings_frame_parser(t); + case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: + return init_window_update_frame_parser(t); + case GRPC_CHTTP2_FRAME_PING: + return init_ping_parser(t); + default: + gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type); + return init_skip_frame(t, 0); + } +} + +static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) { + return window_update < MAX_WINDOW - window; +} + +static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { + grpc_chttp2_parse_state st; + size_t i; + memset(&st, 0, sizeof(st)); + switch (t->parser(t->parser_data, &st, slice, is_last)) { + case GRPC_CHTTP2_PARSE_OK: + if (st.end_of_stream) { + t->incoming_stream->read_closed = 1; + stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + } + if (st.need_flush_reads) { + stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + } + if (st.metadata_boundary) { + grpc_sopb_add_metadata_boundary( + &t->incoming_stream->parser.incoming_sopb); + stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + } + if (st.ack_settings) { + gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); + maybe_start_some_streams(t); + } + if (st.send_ping_ack) { + gpr_slice_buffer_add( + &t->qbuf, + grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes)); + } + if (st.process_ping_reply) { + for (i = 0; i < t->ping_count; i++) { + if (0 == + memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) { + t->pings[i].cb(t->pings[i].user_data); + memmove(&t->pings[i], &t->pings[i + 1], + (t->ping_count - i - 1) * sizeof(outstanding_ping)); + t->ping_count--; + break; + } + } + } + if (st.window_update) { + if (t->incoming_stream_id) { + /* if there was a stream id, this is for some stream */ + stream *s = lookup_stream(t, t->incoming_stream_id); + if (s) { + int was_window_empty = s->outgoing_window == 0; + if (!is_window_update_legal(st.window_update, s->outgoing_window)) { + cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( + GRPC_CHTTP2_FLOW_CONTROL_ERROR), + GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1); + } else { + s->outgoing_window += st.window_update; + /* if this window update makes outgoing ops writable again, + flag that */ + if (was_window_empty && s->outgoing_sopb.nops) { + stream_list_join(t, s, WRITABLE); + } + } + } + } else { + /* transport level window update */ + if (!is_window_update_legal(st.window_update, t->outgoing_window)) { + drop_connection(t); + } else { + t->outgoing_window += st.window_update; + } + } + } + return 1; + case GRPC_CHTTP2_STREAM_ERROR: + become_skip_parser(t); + cancel_stream_id( + t, t->incoming_stream_id, + grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR), + GRPC_CHTTP2_INTERNAL_ERROR, 1); + return 1; + case GRPC_CHTTP2_CONNECTION_ERROR: + drop_connection(t); + return 0; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return 0; +} + +static int process_read(transport *t, gpr_slice slice) { + gpr_uint8 *beg = GPR_SLICE_START_PTR(slice); + gpr_uint8 *end = GPR_SLICE_END_PTR(slice); + gpr_uint8 *cur = beg; + + if (cur == end) return 1; + + switch (t->deframe_state) { + case DTS_CLIENT_PREFIX_0: + case DTS_CLIENT_PREFIX_1: + case DTS_CLIENT_PREFIX_2: + case DTS_CLIENT_PREFIX_3: + case DTS_CLIENT_PREFIX_4: + case DTS_CLIENT_PREFIX_5: + case DTS_CLIENT_PREFIX_6: + case DTS_CLIENT_PREFIX_7: + case DTS_CLIENT_PREFIX_8: + case DTS_CLIENT_PREFIX_9: + case DTS_CLIENT_PREFIX_10: + case DTS_CLIENT_PREFIX_11: + case DTS_CLIENT_PREFIX_12: + case DTS_CLIENT_PREFIX_13: + case DTS_CLIENT_PREFIX_14: + case DTS_CLIENT_PREFIX_15: + case DTS_CLIENT_PREFIX_16: + case DTS_CLIENT_PREFIX_17: + case DTS_CLIENT_PREFIX_18: + case DTS_CLIENT_PREFIX_19: + case DTS_CLIENT_PREFIX_20: + case DTS_CLIENT_PREFIX_21: + case DTS_CLIENT_PREFIX_22: + case DTS_CLIENT_PREFIX_23: + while (cur != end && t->deframe_state != DTS_FH_0) { + if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) { + gpr_log(GPR_ERROR, + "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " + "at byte %d", + CLIENT_CONNECT_STRING[t->deframe_state], + (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur, + (int)*cur, t->deframe_state); + return 0; + } + ++cur; + ++t->deframe_state; + } + if (cur == end) { + return 1; + } + /* fallthrough */ + dts_fh_0: + case DTS_FH_0: + GPR_ASSERT(cur < end); + t->incoming_frame_size = ((gpr_uint32)*cur) << 16; + if (++cur == end) { + t->deframe_state = DTS_FH_1; + return 1; + } + /* fallthrough */ + case DTS_FH_1: + GPR_ASSERT(cur < end); + t->incoming_frame_size |= ((gpr_uint32)*cur) << 8; + if (++cur == end) { + t->deframe_state = DTS_FH_2; + return 1; + } + /* fallthrough */ + case DTS_FH_2: + GPR_ASSERT(cur < end); + t->incoming_frame_size |= *cur; + if (++cur == end) { + t->deframe_state = DTS_FH_3; + return 1; + } + /* fallthrough */ + case DTS_FH_3: + GPR_ASSERT(cur < end); + t->incoming_frame_type = *cur; + if (++cur == end) { + t->deframe_state = DTS_FH_4; + return 1; + } + /* fallthrough */ + case DTS_FH_4: + GPR_ASSERT(cur < end); + t->incoming_frame_flags = *cur; + if (++cur == end) { + t->deframe_state = DTS_FH_5; + return 1; + } + /* fallthrough */ + case DTS_FH_5: + GPR_ASSERT(cur < end); + t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f; + if (++cur == end) { + t->deframe_state = DTS_FH_6; + return 1; + } + /* fallthrough */ + case DTS_FH_6: + GPR_ASSERT(cur < end); + t->incoming_stream_id |= ((gpr_uint32)*cur) << 16; + if (++cur == end) { + t->deframe_state = DTS_FH_7; + return 1; + } + /* fallthrough */ + case DTS_FH_7: + GPR_ASSERT(cur < end); + t->incoming_stream_id |= ((gpr_uint32)*cur) << 8; + if (++cur == end) { + t->deframe_state = DTS_FH_8; + return 1; + } + /* fallthrough */ + case DTS_FH_8: + GPR_ASSERT(cur < end); + t->incoming_stream_id |= ((gpr_uint32)*cur); + t->deframe_state = DTS_FRAME; + if (!init_frame_parser(t)) { + return 0; + } + if (t->incoming_frame_size == 0) { + if (!parse_frame_slice(t, gpr_empty_slice(), 1)) { + return 0; + } + if (++cur == end) { + t->deframe_state = DTS_FH_0; + return 1; + } + goto dts_fh_0; /* loop */ + } + if (++cur == end) { + return 1; + } + /* fallthrough */ + case DTS_FRAME: + GPR_ASSERT(cur < end); + if (end - cur == t->incoming_frame_size) { + if (!parse_frame_slice( + t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { + return 0; + } + t->deframe_state = DTS_FH_0; + return 1; + } else if (end - cur > t->incoming_frame_size) { + if (!parse_frame_slice( + t, gpr_slice_sub_no_ref(slice, cur - beg, + cur + t->incoming_frame_size - beg), + 1)) { + return 0; + } + cur += t->incoming_frame_size; + goto dts_fh_0; /* loop */ + } else { + if (!parse_frame_slice( + t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) { + return 0; + } + t->incoming_frame_size -= (end - cur); + return 1; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + } + + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +/* tcp read callback */ +static void recv_data(void *tp, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + transport *t = tp; + size_t i; + int keep_reading = 0; + + switch (error) { + case GRPC_ENDPOINT_CB_SHUTDOWN: + case GRPC_ENDPOINT_CB_EOF: + case GRPC_ENDPOINT_CB_ERROR: + case GRPC_ENDPOINT_CB_TIMED_OUT: + lock(t); + drop_connection(t); + t->reading = 0; + if (!t->writing && t->ep) { + grpc_endpoint_destroy(t->ep); + t->ep = NULL; + gpr_cv_broadcast(&t->cv); + unref_transport(t); /* safe as we still have a ref for read */ + } + unlock(t); + unref_transport(t); + break; + case GRPC_ENDPOINT_CB_OK: + lock(t); + for (i = 0; i < nslices && process_read(t, slices[i]); i++) + ; + unlock(t); + keep_reading = 1; + break; + } + + for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); + + if (keep_reading) { + grpc_endpoint_notify_on_read(t->ep, recv_data, t, gpr_inf_future); + } +} + +/* + * CALLBACK LOOP + */ + +static grpc_stream_state compute_state(gpr_uint8 write_closed, + gpr_uint8 read_closed) { + if (write_closed && read_closed) return GRPC_STREAM_CLOSED; + if (write_closed) return GRPC_STREAM_SEND_CLOSED; + if (read_closed) return GRPC_STREAM_RECV_CLOSED; + return GRPC_STREAM_OPEN; +} + +static int prepare_callbacks(transport *t) { + stream *s; + grpc_stream_op_buffer temp_sopb; + int n = 0; + while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { + int execute = 1; + temp_sopb = s->parser.incoming_sopb; + s->parser.incoming_sopb = s->callback_sopb; + s->callback_sopb = temp_sopb; + + s->callback_state = compute_state( + s->write_closed && s->outgoing_sopb.nops == 0, s->read_closed); + if (s->callback_state == GRPC_STREAM_CLOSED) { + remove_from_stream_map(t, s); + if (s->published_close) { + execute = 0; + } + s->published_close = 1; + } + + if (execute) { + stream_list_add_tail(t, s, EXECUTING_CALLBACKS); + n = 1; + } + } + return n; +} + +static void run_callbacks(transport *t) { + stream *s; + while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) { + size_t nops = s->callback_sopb.nops; + s->callback_sopb.nops = 0; + t->cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, + s->callback_sopb.ops, nops, s->callback_state); + } +} + +/* + * INTEGRATION GLUE + */ + +static const grpc_transport_vtable vtable = { + sizeof(stream), init_stream, send_batch, set_allow_window_updates, + destroy_stream, abort_stream, close_transport, send_ping, + destroy_transport}; + +void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, + void *arg, + const grpc_channel_args *channel_args, + grpc_endpoint *ep, gpr_slice *slices, + size_t nslices, grpc_mdctx *mdctx, + int is_client) { + transport *t = gpr_malloc(sizeof(transport)); + init_transport(t, setup, arg, channel_args, ep, mdctx, is_client); + ref_transport(t); + recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); +} |