/* * * Copyright 2014, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include "src/core/transport/chttp2_transport.h" #include #include #include #include #include #include #include #include #include "src/core/transport/transport_impl.h" #include "src/core/transport/chttp2/http2_errors.h" #include "src/core/transport/chttp2/hpack_parser.h" #include "src/core/transport/chttp2/frame_data.h" #include "src/core/transport/chttp2/frame_ping.h" #include "src/core/transport/chttp2/frame_rst_stream.h" #include "src/core/transport/chttp2/frame_settings.h" #include "src/core/transport/chttp2/frame_window_update.h" #include "src/core/transport/chttp2/status_conversion.h" #include "src/core/transport/chttp2/stream_encoder.h" #include "src/core/transport/chttp2/stream_map.h" #include "src/core/transport/chttp2/timeout_encoding.h" #define DEFAULT_WINDOW 65536 #define MAX_WINDOW 0x7fffffffu #define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define CLIENT_CONNECT_STRLEN 24 typedef struct transport transport; typedef struct stream stream; /* streams are kept in various linked lists depending on what things need to happen to them... this enum labels each list */ typedef enum { /* streams that have pending writes */ WRITABLE = 0, /* streams that want to send window updates */ WINDOW_UPDATE, /* streams that are waiting to start because there are too many concurrent streams on the connection */ WAITING_FOR_CONCURRENCY, /* streams that want to callback the application */ PENDING_CALLBACKS, /* streams that *ARE* calling back to the application */ EXECUTING_CALLBACKS, STREAM_LIST_COUNT /* must be last */ } stream_list_id; /* deframer state for the overall http2 stream of bytes */ typedef enum { /* prefix: one entry per http2 connection prefix byte */ DTS_CLIENT_PREFIX_0 = 0, DTS_CLIENT_PREFIX_1, DTS_CLIENT_PREFIX_2, DTS_CLIENT_PREFIX_3, DTS_CLIENT_PREFIX_4, DTS_CLIENT_PREFIX_5, DTS_CLIENT_PREFIX_6, DTS_CLIENT_PREFIX_7, DTS_CLIENT_PREFIX_8, DTS_CLIENT_PREFIX_9, DTS_CLIENT_PREFIX_10, DTS_CLIENT_PREFIX_11, DTS_CLIENT_PREFIX_12, DTS_CLIENT_PREFIX_13, DTS_CLIENT_PREFIX_14, DTS_CLIENT_PREFIX_15, DTS_CLIENT_PREFIX_16, DTS_CLIENT_PREFIX_17, DTS_CLIENT_PREFIX_18, DTS_CLIENT_PREFIX_19, DTS_CLIENT_PREFIX_20, DTS_CLIENT_PREFIX_21, DTS_CLIENT_PREFIX_22, DTS_CLIENT_PREFIX_23, /* frame header byte 0... */ /* must follow from the prefix states */ DTS_FH_0, DTS_FH_1, DTS_FH_2, DTS_FH_3, DTS_FH_4, DTS_FH_5, DTS_FH_6, DTS_FH_7, /* ... frame header byte 8 */ DTS_FH_8, /* inside a http2 frame */ DTS_FRAME } deframe_transport_state; typedef struct { stream *head; stream *tail; } stream_list; typedef struct { stream *next; stream *prev; } stream_link; typedef enum { ERROR_STATE_NONE, ERROR_STATE_SEEN, ERROR_STATE_NOTIFIED } error_state; /* We keep several sets of connection wide parameters */ typedef enum { /* The settings our peer has asked for (and we have acked) */ PEER_SETTINGS = 0, /* The settings we'd like to have */ LOCAL_SETTINGS, /* The settings we've published to our peer */ SENT_SETTINGS, /* The settings the peer has acked */ ACKED_SETTINGS, NUM_SETTING_SETS } setting_set; /* Outstanding ping request data */ typedef struct { gpr_uint8 id[8]; void (*cb)(void *user_data); void *user_data; } outstanding_ping; struct transport { grpc_transport base; /* must be first */ const grpc_transport_callbacks *cb; void *cb_user_data; grpc_endpoint *ep; grpc_mdctx *metadata_context; gpr_refcount refs; gpr_uint8 is_client; gpr_mu mu; gpr_cv cv; /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; gpr_uint8 writing; gpr_uint8 calling_back; error_state error_state; /* stream indexing */ gpr_uint32 next_stream_id; /* settings */ gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; gpr_uint8 sent_local_settings; gpr_uint8 dirtied_local_settings; /* window management */ gpr_uint32 outgoing_window; gpr_uint32 incoming_window; /* deframing */ deframe_transport_state deframe_state; gpr_uint8 incoming_frame_type; gpr_uint8 incoming_frame_flags; gpr_uint8 header_eof; gpr_uint32 expect_continuation_stream_id; gpr_uint32 incoming_frame_size; gpr_uint32 incoming_stream_id; /* hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; /* various parsers */ grpc_chttp2_hpack_parser hpack_parser; /* simple one shot parsers */ union { grpc_chttp2_window_update_parser window_update; grpc_chttp2_settings_parser settings; grpc_chttp2_ping_parser ping; } simple_parsers; /* state for a stream that's not yet been created */ grpc_stream_op_buffer new_stream_sopb; /* active parser */ void *parser_data; stream *incoming_stream; grpc_chttp2_parse_error (*parser)(void *parser_user_data, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); gpr_slice_buffer outbuf; gpr_slice_buffer qbuf; stream_list lists[STREAM_LIST_COUNT]; grpc_chttp2_stream_map stream_map; /* metadata object cache */ grpc_mdstr *str_grpc_timeout; /* pings */ outstanding_ping *pings; size_t ping_count; size_t ping_capacity; gpr_int64 ping_counter; }; struct stream { gpr_uint32 id; gpr_uint32 outgoing_window; gpr_uint32 incoming_window; gpr_uint8 write_closed; gpr_uint8 read_closed; gpr_uint8 cancelled; gpr_uint8 allow_window_updates; gpr_uint8 published_close; stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; grpc_stream_op_buffer outgoing_sopb; grpc_chttp2_data_parser parser; grpc_stream_state callback_state; grpc_stream_op_buffer callback_sopb; }; static const grpc_transport_vtable vtable; static void push_setting(transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); static int prepare_callbacks(transport *t); static void run_callbacks(transport *t); static int prepare_write(transport *t); static void finish_write(void *t, grpc_endpoint_cb_status status); static void lock(transport *t); static void unlock(transport *t); static void drop_connection(transport *t); static void end_all_the_calls(transport *t); static stream *stream_list_remove_head(transport *t, stream_list_id id); static void stream_list_remove(transport *t, stream *s, stream_list_id id); static void stream_list_add_tail(transport *t, stream *s, stream_list_id id); static void stream_list_join(transport *t, stream *s, stream_list_id id); static void cancel_stream_id(transport *t, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, int send_rst); static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, grpc_chttp2_error_code error_code, int send_rst); static stream *lookup_stream(transport *t, gpr_uint32 id); static void remove_from_stream_map(transport *t, stream *s); static void maybe_start_some_streams(transport *t); static void become_skip_parser(transport *t); /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ static void unref_transport(transport *t) { size_t i; if (!gpr_unref(&t->refs)) return; gpr_mu_lock(&t->mu); GPR_ASSERT(t->ep == NULL); gpr_slice_buffer_destroy(&t->outbuf); gpr_slice_buffer_destroy(&t->qbuf); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); grpc_mdstr_unref(t->str_grpc_timeout); for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(t->lists[i].head == NULL); GPR_ASSERT(t->lists[i].tail == NULL); } GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0); grpc_chttp2_stream_map_destroy(&t->stream_map); gpr_mu_unlock(&t->mu); gpr_mu_destroy(&t->mu); /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ for (i = 0; i < t->ping_count; i++) { t->pings[i].cb(t->pings[i].user_data); } gpr_free(t->pings); gpr_free(t); } static void ref_transport(transport *t) { gpr_ref(&t->refs); } static void init_transport(transport *t, grpc_transport_setup_callback setup, void *arg, const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, int is_client) { size_t i; int j; grpc_transport_setup_result sr; GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ gpr_ref_init(&t->refs, 2); gpr_mu_init(&t->mu); gpr_cv_init(&t->cv); t->metadata_context = mdctx; t->str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); t->reading = 1; t->writing = 0; t->error_state = ERROR_STATE_NONE; t->next_stream_id = is_client ? 1 : 2; t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; t->expect_continuation_stream_id = 0; t->pings = NULL; t->ping_count = 0; t->ping_capacity = 0; t->ping_counter = gpr_now().tv_nsec; grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->qbuf); if (is_client) { gpr_slice_buffer_add(&t->qbuf, gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's needed. TODO(ctiller): tune this */ grpc_chttp2_stream_map_init(&t->stream_map, 8); memset(&t->lists, 0, sizeof(t->lists)); /* copy in initial settings to all setting sets */ for (i = 0; i < NUM_SETTING_SETS; i++) { for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) { t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value; } } t->dirtied_local_settings = 1; t->sent_local_settings = 0; /* configure http2 the way we like it */ if (t->is_client) { push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) { if (t->is_client) { gpr_log(GPR_ERROR, "%s: is ignored on the client", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) { gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else { push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, channel_args->args[i].value.integer); } } } } gpr_mu_lock(&t->mu); t->calling_back = 1; ref_transport(t); gpr_mu_unlock(&t->mu); sr = setup(arg, &t->base, t->metadata_context); lock(t); t->cb = sr.callbacks; t->cb_user_data = sr.user_data; grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context); t->calling_back = 0; gpr_cv_broadcast(&t->cv); unlock(t); unref_transport(t); } static void destroy_transport(grpc_transport *gt) { transport *t = (transport *)gt; gpr_mu_lock(&t->mu); while (t->calling_back) { gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); } t->cb = NULL; gpr_mu_unlock(&t->mu); unref_transport(t); } static void close_transport(grpc_transport *gt) { transport *t = (transport *)gt; gpr_mu_lock(&t->mu); if (t->ep) { grpc_endpoint_shutdown(t->ep); } gpr_mu_unlock(&t->mu); } static int init_stream(grpc_transport *gt, grpc_stream *gs, const void *server_data) { transport *t = (transport *)gt; stream *s = (stream *)gs; ref_transport(t); if (!server_data) { lock(t); s->id = 0; } else { s->id = (gpr_uint32)(gpr_uintptr)server_data; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); } s->outgoing_window = DEFAULT_WINDOW; s->incoming_window = DEFAULT_WINDOW; s->write_closed = 0; s->read_closed = 0; s->cancelled = 0; s->allow_window_updates = 0; s->published_close = 0; memset(&s->links, 0, sizeof(s->links)); memset(&s->included, 0, sizeof(s->included)); grpc_sopb_init(&s->outgoing_sopb); grpc_chttp2_data_parser_init(&s->parser); grpc_sopb_init(&s->callback_sopb); if (!server_data) { unlock(t); } return 0; } static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { transport *t = (transport *)gt; stream *s = (stream *)gs; size_t i; gpr_mu_lock(&t->mu); /* await pending callbacks TODO(ctiller): this could be optimized to check if this stream is getting callbacks */ while (t->calling_back) { gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); } /* stop parsing if we're currently parsing this stream */ if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id && s->id != 0) { become_skip_parser(t); } for (i = 0; i < STREAM_LIST_COUNT; i++) { stream_list_remove(t, s, i); } remove_from_stream_map(t, s); gpr_cv_broadcast(&t->cv); gpr_mu_unlock(&t->mu); grpc_sopb_destroy(&s->outgoing_sopb); grpc_chttp2_data_parser_destroy(&s->parser); grpc_sopb_destroy(&s->callback_sopb); unref_transport(t); } /* * LIST MANAGEMENT */ static stream *stream_list_remove_head(transport *t, stream_list_id id) { stream *s = t->lists[id].head; if (s) { stream *new_head = s->links[id].next; GPR_ASSERT(s->included[id]); if (new_head) { t->lists[id].head = new_head; new_head->links[id].prev = NULL; } else { t->lists[id].head = NULL; t->lists[id].tail = NULL; } s->included[id] = 0; } return s; } static void stream_list_remove(transport *t, stream *s, stream_list_id id) { if (!s->included[id]) return; s->included[id] = 0; if (s->links[id].prev) { s->links[id].prev->links[id].next = s->links[id].next; } else { GPR_ASSERT(t->lists[id].head == s); t->lists[id].head = s->links[id].next; } if (s->links[id].next) { s->links[id].next->links[id].prev = s->links[id].prev; } else { t->lists[id].tail = s->links[id].prev; } } static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { stream *old_tail; GPR_ASSERT(!s->included[id]); old_tail = t->lists[id].tail; s->links[id].next = NULL; s->links[id].prev = old_tail; if (old_tail) { old_tail->links[id].next = s; } else { s->links[id].prev = NULL; t->lists[id].head = s; } t->lists[id].tail = s; s->included[id] = 1; } static void stream_list_join(transport *t, stream *s, stream_list_id id) { if (s->included[id]) { return; } stream_list_add_tail(t, s, id); } static void remove_from_stream_map(transport *t, stream *s) { if (s->id == 0) return; if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) { maybe_start_some_streams(t); } } /* * LOCK MANAGEMENT */ /* We take a transport-global lock in response to calls coming in from above, and in response to data being received from below. New data to be written is always queued, as are callbacks to process data. During unlock() we check our todo lists and initiate callbacks and flush writes. */ static void lock(transport *t) { gpr_mu_lock(&t->mu); } static void unlock(transport *t) { int start_write = 0; int perform_callbacks = 0; int call_closed = 0; grpc_endpoint *ep = t->ep; /* see if we need to trigger a write - and if so, get the data ready */ if (ep && !t->writing) { t->writing = start_write = prepare_write(t); if (start_write) { ref_transport(t); } } /* gather any callbacks that need to be made */ if (!t->calling_back && t->cb) { perform_callbacks = prepare_callbacks(t); if (perform_callbacks) { t->calling_back = 1; } if (t->error_state == ERROR_STATE_SEEN) { call_closed = 1; t->calling_back = 1; t->error_state = ERROR_STATE_NOTIFIED; } } if (perform_callbacks || call_closed) { ref_transport(t); } /* finally unlock */ gpr_mu_unlock(&t->mu); /* perform some callbacks if necessary */ if (perform_callbacks) { run_callbacks(t); } if (call_closed) { t->cb->closed(t->cb_user_data, &t->base); } /* write some bytes if necessary */ while (start_write) { switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count, finish_write, t, gpr_inf_future)) { case GRPC_ENDPOINT_WRITE_DONE: /* grab the lock directly without wrappers since we just want to continue writes if we loop: no need to check read callbacks again */ gpr_mu_lock(&t->mu); t->outbuf.count = 0; t->outbuf.length = 0; t->writing = start_write = prepare_write(t); if (!start_write) { if (!t->reading) { grpc_endpoint_destroy(t->ep); t->ep = NULL; gpr_cv_broadcast(&t->cv); /* endpoint ref: safe because we'll still have the ref for write */ unref_transport(t); } } gpr_mu_unlock(&t->mu); if (!start_write) { unref_transport(t); } break; case GRPC_ENDPOINT_WRITE_ERROR: start_write = 0; /* use the wrapper lock/unlock here as we drop_connection, causing read callbacks to be queued (which will be cleared during unlock) */ lock(t); t->outbuf.count = 0; t->outbuf.length = 0; t->writing = 0; drop_connection(t); if (!t->reading) { grpc_endpoint_destroy(t->ep); t->ep = NULL; gpr_cv_broadcast(&t->cv); /* endpoint ref: safe because we'll still have the ref for write */ unref_transport(t); } unlock(t); unref_transport(t); break; case GRPC_ENDPOINT_WRITE_PENDING: start_write = 0; break; } } if (perform_callbacks || call_closed) { lock(t); t->calling_back = 0; gpr_cv_broadcast(&t->cv); unlock(t); unref_transport(t); } } /* * OUTPUT PROCESSING */ static void push_setting(transport *t, grpc_chttp2_setting_id id, gpr_uint32 value) { const grpc_chttp2_setting_parameters *sp = &grpc_chttp2_settings_parameters[id]; gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); if (use_value != value) { gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name, value, use_value); } if (use_value != t->settings[LOCAL_SETTINGS][id]) { t->settings[LOCAL_SETTINGS][id] = use_value; t->dirtied_local_settings = 1; } } static void finish_write(void *tp, grpc_endpoint_cb_status error) { transport *t = tp; lock(t); if (error != GRPC_ENDPOINT_CB_OK) { drop_connection(t); } t->outbuf.count = 0; t->outbuf.length = 0; /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ t->writing = 0; if (!t->reading) { grpc_endpoint_destroy(t->ep); t->ep = NULL; gpr_cv_broadcast(&t->cv); unref_transport(t); /* safe because we'll still have the ref for write */ } unlock(t); unref_transport(t); } static int prepare_write(transport *t) { stream *s; gpr_slice_buffer tempbuf; /* simple writes are queued to qbuf, and flushed here */ tempbuf = t->qbuf; t->qbuf = t->outbuf; t->outbuf = tempbuf; GPR_ASSERT(t->qbuf.count == 0); if (t->dirtied_local_settings && !t->sent_local_settings) { gpr_slice_buffer_add( &t->outbuf, grpc_chttp2_settings_create(t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS], GRPC_CHTTP2_NUM_SETTINGS)); t->dirtied_local_settings = 0; t->sent_local_settings = 1; } /* for each stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE))) { gpr_uint32 written = grpc_chttp2_encode_some( s->outgoing_sopb.ops, &s->outgoing_sopb.nops, s->write_closed, &t->outbuf, GPR_MIN(t->outgoing_window, s->outgoing_window), s->id, &t->hpack_compressor); t->outgoing_window -= written; s->outgoing_window -= written; /* if there are no more writes to do and writes are closed, we need to queue a callback to let the application know */ if (s->write_closed && s->outgoing_sopb.nops == 0) { stream_list_join(t, s, PENDING_CALLBACKS); } /* if there are still writes to do and the stream still has window available, then schedule a further write */ if (s->outgoing_sopb.nops && s->outgoing_window) { GPR_ASSERT(!t->outgoing_window); stream_list_add_tail(t, s, WRITABLE); } } /* for each stream that wants to update its window, add that window here */ while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { gpr_uint32 window_add = DEFAULT_WINDOW - s->incoming_window; if (!s->read_closed && window_add) { gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create(s->id, window_add)); s->incoming_window += window_add; } } /* if the transport is ready to send a window update, do so here also */ if (t->incoming_window < DEFAULT_WINDOW / 2) { gpr_uint32 window_add = DEFAULT_WINDOW - t->incoming_window; gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create(0, window_add)); t->incoming_window += window_add; } return t->outbuf.length > 0; } static void maybe_start_some_streams(transport *t) { while ( grpc_chttp2_stream_map_size(&t->stream_map) < t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); if (!s) break; GPR_ASSERT(s->id == 0); s->id = t->next_stream_id; t->next_stream_id += 2; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); stream_list_join(t, s, WRITABLE); } } static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, size_t ops_count, int is_last) { transport *t = (transport *)gt; stream *s = (stream *)gs; lock(t); if (is_last) { s->write_closed = 1; } if (!s->cancelled) { grpc_sopb_append(&s->outgoing_sopb, ops, ops_count); if (is_last && s->outgoing_sopb.nops == 0) { if (s->id != 0) { gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_data_frame_create_empty_close(s->id)); } } else if (s->id == 0) { stream_list_join(t, s, WAITING_FOR_CONCURRENCY); maybe_start_some_streams(t); } else if (s->outgoing_window) { stream_list_join(t, s, WRITABLE); } } else { grpc_stream_ops_unref_owned_objects(ops, ops_count); } if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) { stream_list_join(t, s, PENDING_CALLBACKS); } unlock(t); } static void abort_stream(grpc_transport *gt, grpc_stream *gs, grpc_status_code status) { transport *t = (transport *)gt; stream *s = (stream *)gs; lock(t); cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status), 1); unlock(t); } static void send_ping(grpc_transport *gt, void (*cb)(void *user_data), void *user_data) { transport *t = (transport *)gt; outstanding_ping *p; lock(t); if (t->ping_capacity == t->ping_count) { t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2); t->pings = gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity); } p = &t->pings[t->ping_count++]; p->id[0] = t->ping_counter >> 56; p->id[1] = t->ping_counter >> 48; p->id[2] = t->ping_counter >> 40; p->id[3] = t->ping_counter >> 32; p->id[4] = t->ping_counter >> 24; p->id[5] = t->ping_counter >> 16; p->id[6] = t->ping_counter >> 8; p->id[7] = t->ping_counter; p->cb = cb; p->user_data = user_data; gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); unlock(t); } /* * INPUT PROCESSING */ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, int send_rst) { char buffer[32]; int had_outgoing; if (s) { /* clear out any unreported input & output: nobody cares anymore */ grpc_sopb_reset(&s->parser.incoming_sopb); had_outgoing = s->outgoing_sopb.nops != 0; grpc_sopb_reset(&s->outgoing_sopb); if (s->cancelled) { send_rst = 0; } else if (!s->read_closed || !s->write_closed || had_outgoing) { s->cancelled = 1; s->read_closed = 1; s->write_closed = 1; sprintf(buffer, "%d", local_status); grpc_sopb_add_metadata( &s->parser.incoming_sopb, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); stream_list_join(t, s, PENDING_CALLBACKS); } } if (!id) send_rst = 0; if (send_rst) { gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(id, error_code)); } } static void cancel_stream_id(transport *t, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, int send_rst) { cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code, send_rst); } static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, grpc_chttp2_error_code error_code, int send_rst) { cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst); } static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) { cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE, GRPC_CHTTP2_INTERNAL_ERROR, 0); } static void end_all_the_calls(transport *t) { grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t); } static void drop_connection(transport *t) { if (t->error_state == ERROR_STATE_NONE) { t->error_state = ERROR_STATE_SEEN; } end_all_the_calls(t); } static void maybe_join_window_updates(transport *t, stream *s) { if (s->allow_window_updates && s->incoming_window < DEFAULT_WINDOW / 2) { stream_list_join(t, s, WINDOW_UPDATE); } } static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp, int allow) { transport *t = (transport *)tp; stream *s = (stream *)sp; lock(t); s->allow_window_updates = allow; if (allow) { maybe_join_window_updates(t, s); } else { stream_list_remove(t, s, WINDOW_UPDATE); } unlock(t); } static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { if (t->incoming_frame_size > t->incoming_window) { gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", t->incoming_frame_size, t->incoming_window); return GRPC_CHTTP2_CONNECTION_ERROR; } if (t->incoming_frame_size > s->incoming_window) { gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", t->incoming_frame_size, s->incoming_window); return GRPC_CHTTP2_CONNECTION_ERROR; } t->incoming_window -= t->incoming_frame_size; s->incoming_window -= t->incoming_frame_size; /* if the stream incoming window is getting low, schedule an update */ maybe_join_window_updates(t, s); return GRPC_CHTTP2_PARSE_OK; } static stream *lookup_stream(transport *t, gpr_uint32 id) { return grpc_chttp2_stream_map_find(&t->stream_map, id); } static grpc_chttp2_parse_error skip_parser(void *parser, grpc_chttp2_parse_state *st, gpr_slice slice, int is_last) { return GRPC_CHTTP2_PARSE_OK; } static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); } static int init_skip_frame(transport *t, int is_header) { if (is_header) { int is_eoh = t->expect_continuation_stream_id != 0; t->parser = grpc_chttp2_header_parser_parse; t->parser_data = &t->hpack_parser; t->hpack_parser.on_header = skip_header; t->hpack_parser.on_header_user_data = NULL; t->hpack_parser.is_boundary = is_eoh; t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0; } else { t->parser = skip_parser; } return 1; } static void become_skip_parser(transport *t) { init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse); } static int init_data_frame_parser(transport *t) { stream *s = lookup_stream(t, t->incoming_stream_id); grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; if (!s || s->read_closed) return init_skip_frame(t, 0); if (err == GRPC_CHTTP2_PARSE_OK) { err = update_incoming_window(t, s); } if (err == GRPC_CHTTP2_PARSE_OK) { err = grpc_chttp2_data_parser_begin_frame(&s->parser, t->incoming_frame_flags); } switch (err) { case GRPC_CHTTP2_PARSE_OK: t->incoming_stream = s; t->parser = grpc_chttp2_data_parser_parse; t->parser_data = &s->parser; return 1; case GRPC_CHTTP2_STREAM_ERROR: cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( GRPC_CHTTP2_INTERNAL_ERROR), GRPC_CHTTP2_INTERNAL_ERROR, 1); return init_skip_frame(t, 0); case GRPC_CHTTP2_CONNECTION_ERROR: drop_connection(t); return 0; } gpr_log(GPR_ERROR, "should never reach here"); abort(); return 0; } static void free_timeout(void *p) { gpr_free(p); } static void on_header(void *tp, grpc_mdelem *md) { transport *t = tp; stream *s = t->incoming_stream; GPR_ASSERT(s); stream_list_join(t, s, PENDING_CALLBACKS); if (md->key == t->str_grpc_timeout) { gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); if (!cached_timeout) { /* not already parsed: parse it now, and store the result away */ cached_timeout = gpr_malloc(sizeof(gpr_timespec)); if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value), cached_timeout)) { gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", grpc_mdstr_as_c_string(md->value)); *cached_timeout = gpr_inf_future; } grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } grpc_sopb_add_deadline(&s->parser.incoming_sopb, gpr_time_add(gpr_now(), *cached_timeout)); grpc_mdelem_unref(md); } else { grpc_sopb_add_metadata(&s->parser.incoming_sopb, md); } } static int init_header_frame_parser(transport *t, int is_continuation) { int is_eoh = (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; stream *s; if (is_eoh) { t->expect_continuation_stream_id = 0; } else { t->expect_continuation_stream_id = t->incoming_stream_id; } if (!is_continuation) { t->header_eof = (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; } /* could be a new stream or an existing stream */ s = lookup_stream(t, t->incoming_stream_id); if (!s) { if (is_continuation) { gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received"); return init_skip_frame(t, 1); } if (t->is_client) { if ((t->incoming_stream_id & 1) && t->incoming_stream_id < t->next_stream_id) { /* this is an old (probably cancelled) stream */ } else { gpr_log(GPR_ERROR, "ignoring new stream creation on client"); } return init_skip_frame(t, 1); } t->incoming_stream = NULL; /* if stream is accepted, we set incoming_stream in init_stream */ t->cb->accept_stream(t->cb_user_data, &t->base, (void *)(gpr_uintptr)t->incoming_stream_id); s = t->incoming_stream; if (!s) { gpr_log(GPR_ERROR, "stream not accepted"); return init_skip_frame(t, 1); } } else { t->incoming_stream = s; } if (t->incoming_stream->read_closed) { gpr_log(GPR_ERROR, "skipping already closed stream header"); t->incoming_stream = NULL; return init_skip_frame(t, 1); } t->parser = grpc_chttp2_header_parser_parse; t->parser_data = &t->hpack_parser; t->hpack_parser.on_header = on_header; t->hpack_parser.on_header_user_data = t; t->hpack_parser.is_boundary = is_eoh; t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0; if (!is_continuation && (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser); } return 1; } static int init_window_update_frame_parser(transport *t) { int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame( &t->simple_parsers.window_update, t->incoming_frame_size, t->incoming_frame_flags); if (!ok) { drop_connection(t); } t->parser = grpc_chttp2_window_update_parser_parse; t->parser_data = &t->simple_parsers.window_update; return ok; } static int init_ping_parser(transport *t) { int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping, t->incoming_frame_size, t->incoming_frame_flags); if (!ok) { drop_connection(t); } t->parser = grpc_chttp2_ping_parser_parse; t->parser_data = &t->simple_parsers.ping; return ok; } static int init_settings_frame_parser(transport *t) { int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_settings_parser_begin_frame( &t->simple_parsers.settings, t->incoming_frame_size, t->incoming_frame_flags, t->settings[PEER_SETTINGS]); if (!ok) { drop_connection(t); } if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS], GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); } t->parser = grpc_chttp2_settings_parser_parse; t->parser_data = &t->simple_parsers.settings; return ok; } static int init_frame_parser(transport *t) { if (t->expect_continuation_stream_id != 0) { if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) { gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x", t->incoming_frame_type); return 0; } if (t->expect_continuation_stream_id != t->incoming_stream_id) { gpr_log(GPR_ERROR, "Expected CONTINUATION frame for stream %08x, got stream %08x", t->expect_continuation_stream_id, t->incoming_stream_id); return 0; } return init_header_frame_parser(t, 1); } switch (t->incoming_frame_type) { case GRPC_CHTTP2_FRAME_DATA: return init_data_frame_parser(t); case GRPC_CHTTP2_FRAME_HEADER: return init_header_frame_parser(t, 0); case GRPC_CHTTP2_FRAME_CONTINUATION: gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); return 0; case GRPC_CHTTP2_FRAME_RST_STREAM: /* TODO(ctiller): actually parse the reason */ cancel_stream_id( t, t->incoming_stream_id, grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL), GRPC_CHTTP2_CANCEL, 0); return init_skip_frame(t, 0); case GRPC_CHTTP2_FRAME_SETTINGS: return init_settings_frame_parser(t); case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: return init_window_update_frame_parser(t); case GRPC_CHTTP2_FRAME_PING: return init_ping_parser(t); default: gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type); return init_skip_frame(t, 0); } } static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) { return window_update < MAX_WINDOW - window; } static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { grpc_chttp2_parse_state st; size_t i; memset(&st, 0, sizeof(st)); switch (t->parser(t->parser_data, &st, slice, is_last)) { case GRPC_CHTTP2_PARSE_OK: if (st.end_of_stream) { t->incoming_stream->read_closed = 1; stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); } if (st.need_flush_reads) { stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); } if (st.metadata_boundary) { grpc_sopb_add_metadata_boundary( &t->incoming_stream->parser.incoming_sopb); stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); } if (st.ack_settings) { gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); maybe_start_some_streams(t); } if (st.send_ping_ack) { gpr_slice_buffer_add( &t->qbuf, grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes)); } if (st.process_ping_reply) { for (i = 0; i < t->ping_count; i++) { if (0 == memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) { t->pings[i].cb(t->pings[i].user_data); memmove(&t->pings[i], &t->pings[i + 1], (t->ping_count - i - 1) * sizeof(outstanding_ping)); t->ping_count--; break; } } } if (st.window_update) { if (t->incoming_stream_id) { /* if there was a stream id, this is for some stream */ stream *s = lookup_stream(t, t->incoming_stream_id); if (s) { int was_window_empty = s->outgoing_window == 0; if (!is_window_update_legal(st.window_update, s->outgoing_window)) { cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( GRPC_CHTTP2_FLOW_CONTROL_ERROR), GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1); } else { s->outgoing_window += st.window_update; /* if this window update makes outgoing ops writable again, flag that */ if (was_window_empty && s->outgoing_sopb.nops) { stream_list_join(t, s, WRITABLE); } } } } else { /* transport level window update */ if (!is_window_update_legal(st.window_update, t->outgoing_window)) { drop_connection(t); } else { t->outgoing_window += st.window_update; } } } return 1; case GRPC_CHTTP2_STREAM_ERROR: become_skip_parser(t); cancel_stream_id( t, t->incoming_stream_id, grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR), GRPC_CHTTP2_INTERNAL_ERROR, 1); return 1; case GRPC_CHTTP2_CONNECTION_ERROR: drop_connection(t); return 0; } gpr_log(GPR_ERROR, "should never reach here"); abort(); return 0; } static int process_read(transport *t, gpr_slice slice) { gpr_uint8 *beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; if (cur == end) return 1; switch (t->deframe_state) { case DTS_CLIENT_PREFIX_0: case DTS_CLIENT_PREFIX_1: case DTS_CLIENT_PREFIX_2: case DTS_CLIENT_PREFIX_3: case DTS_CLIENT_PREFIX_4: case DTS_CLIENT_PREFIX_5: case DTS_CLIENT_PREFIX_6: case DTS_CLIENT_PREFIX_7: case DTS_CLIENT_PREFIX_8: case DTS_CLIENT_PREFIX_9: case DTS_CLIENT_PREFIX_10: case DTS_CLIENT_PREFIX_11: case DTS_CLIENT_PREFIX_12: case DTS_CLIENT_PREFIX_13: case DTS_CLIENT_PREFIX_14: case DTS_CLIENT_PREFIX_15: case DTS_CLIENT_PREFIX_16: case DTS_CLIENT_PREFIX_17: case DTS_CLIENT_PREFIX_18: case DTS_CLIENT_PREFIX_19: case DTS_CLIENT_PREFIX_20: case DTS_CLIENT_PREFIX_21: case DTS_CLIENT_PREFIX_22: case DTS_CLIENT_PREFIX_23: while (cur != end && t->deframe_state != DTS_FH_0) { if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) { gpr_log(GPR_ERROR, "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "at byte %d", CLIENT_CONNECT_STRING[t->deframe_state], (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur, (int)*cur, t->deframe_state); return 0; } ++cur; ++t->deframe_state; } if (cur == end) { return 1; } /* fallthrough */ dts_fh_0: case DTS_FH_0: GPR_ASSERT(cur < end); t->incoming_frame_size = ((gpr_uint32)*cur) << 16; if (++cur == end) { t->deframe_state = DTS_FH_1; return 1; } /* fallthrough */ case DTS_FH_1: GPR_ASSERT(cur < end); t->incoming_frame_size |= ((gpr_uint32)*cur) << 8; if (++cur == end) { t->deframe_state = DTS_FH_2; return 1; } /* fallthrough */ case DTS_FH_2: GPR_ASSERT(cur < end); t->incoming_frame_size |= *cur; if (++cur == end) { t->deframe_state = DTS_FH_3; return 1; } /* fallthrough */ case DTS_FH_3: GPR_ASSERT(cur < end); t->incoming_frame_type = *cur; if (++cur == end) { t->deframe_state = DTS_FH_4; return 1; } /* fallthrough */ case DTS_FH_4: GPR_ASSERT(cur < end); t->incoming_frame_flags = *cur; if (++cur == end) { t->deframe_state = DTS_FH_5; return 1; } /* fallthrough */ case DTS_FH_5: GPR_ASSERT(cur < end); t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f; if (++cur == end) { t->deframe_state = DTS_FH_6; return 1; } /* fallthrough */ case DTS_FH_6: GPR_ASSERT(cur < end); t->incoming_stream_id |= ((gpr_uint32)*cur) << 16; if (++cur == end) { t->deframe_state = DTS_FH_7; return 1; } /* fallthrough */ case DTS_FH_7: GPR_ASSERT(cur < end); t->incoming_stream_id |= ((gpr_uint32)*cur) << 8; if (++cur == end) { t->deframe_state = DTS_FH_8; return 1; } /* fallthrough */ case DTS_FH_8: GPR_ASSERT(cur < end); t->incoming_stream_id |= ((gpr_uint32)*cur); t->deframe_state = DTS_FRAME; if (!init_frame_parser(t)) { return 0; } if (t->incoming_frame_size == 0) { if (!parse_frame_slice(t, gpr_empty_slice(), 1)) { return 0; } if (++cur == end) { t->deframe_state = DTS_FH_0; return 1; } goto dts_fh_0; /* loop */ } if (++cur == end) { return 1; } /* fallthrough */ case DTS_FRAME: GPR_ASSERT(cur < end); if (end - cur == t->incoming_frame_size) { if (!parse_frame_slice( t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { return 0; } t->deframe_state = DTS_FH_0; return 1; } else if (end - cur > t->incoming_frame_size) { if (!parse_frame_slice( t, gpr_slice_sub_no_ref(slice, cur - beg, cur + t->incoming_frame_size - beg), 1)) { return 0; } cur += t->incoming_frame_size; goto dts_fh_0; /* loop */ } else { if (!parse_frame_slice( t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) { return 0; } t->incoming_frame_size -= (end - cur); return 1; } gpr_log(GPR_ERROR, "should never reach here"); abort(); } gpr_log(GPR_ERROR, "should never reach here"); abort(); } /* tcp read callback */ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { transport *t = tp; size_t i; int keep_reading = 0; switch (error) { case GRPC_ENDPOINT_CB_SHUTDOWN: case GRPC_ENDPOINT_CB_EOF: case GRPC_ENDPOINT_CB_ERROR: case GRPC_ENDPOINT_CB_TIMED_OUT: lock(t); drop_connection(t); t->reading = 0; if (!t->writing && t->ep) { grpc_endpoint_destroy(t->ep); t->ep = NULL; gpr_cv_broadcast(&t->cv); unref_transport(t); /* safe as we still have a ref for read */ } unlock(t); unref_transport(t); break; case GRPC_ENDPOINT_CB_OK: lock(t); for (i = 0; i < nslices && process_read(t, slices[i]); i++) ; unlock(t); keep_reading = 1; break; } for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); if (keep_reading) { grpc_endpoint_notify_on_read(t->ep, recv_data, t, gpr_inf_future); } } /* * CALLBACK LOOP */ static grpc_stream_state compute_state(gpr_uint8 write_closed, gpr_uint8 read_closed) { if (write_closed && read_closed) return GRPC_STREAM_CLOSED; if (write_closed) return GRPC_STREAM_SEND_CLOSED; if (read_closed) return GRPC_STREAM_RECV_CLOSED; return GRPC_STREAM_OPEN; } static int prepare_callbacks(transport *t) { stream *s; grpc_stream_op_buffer temp_sopb; int n = 0; while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { int execute = 1; temp_sopb = s->parser.incoming_sopb; s->parser.incoming_sopb = s->callback_sopb; s->callback_sopb = temp_sopb; s->callback_state = compute_state( s->write_closed && s->outgoing_sopb.nops == 0, s->read_closed); if (s->callback_state == GRPC_STREAM_CLOSED) { remove_from_stream_map(t, s); if (s->published_close) { execute = 0; } s->published_close = 1; } if (execute) { stream_list_add_tail(t, s, EXECUTING_CALLBACKS); n = 1; } } return n; } static void run_callbacks(transport *t) { stream *s; while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) { size_t nops = s->callback_sopb.nops; s->callback_sopb.nops = 0; t->cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, s->callback_sopb.ops, nops, s->callback_state); } } /* * INTEGRATION GLUE */ static const grpc_transport_vtable vtable = { sizeof(stream), init_stream, send_batch, set_allow_window_updates, destroy_stream, abort_stream, close_transport, send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void *arg, const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_mdctx *mdctx, int is_client) { transport *t = gpr_malloc(sizeof(transport)); init_transport(t, setup, arg, channel_args, ep, mdctx, is_client); ref_transport(t); recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); }