diff options
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 826 |
1 files changed, 579 insertions, 247 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index effc3c4b3b..10f52f4923 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -52,6 +52,7 @@ #include "src/core/transport/transport_impl.h" #define DEFAULT_WINDOW 65535 +#define GRPC_CHTTP2_STREAM_LOOKAHEAD DEFAULT_WINDOW #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu @@ -75,14 +76,14 @@ int grpc_flowctl_trace = 0; #define STREAM_FROM_GLOBAL(sg) \ ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global))) +#define STREAM_FROM_PARSING(sg) \ + ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, parsing))) + static const grpc_transport_vtable vtable; static void lock(grpc_chttp2_transport *t); static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); -static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); - /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(grpc_exec_ctx *exec_ctx, void *t, int iomgr_success_ignored); @@ -103,11 +104,13 @@ static void perform_stream_op_locked( grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op); /** Cancel a stream: coming from the transport API */ -static void cancel_from_api(grpc_chttp2_transport_global *transport_global, +static void cancel_from_api(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status); -static void close_from_api(grpc_chttp2_transport_global *transport_global, +static void close_from_api(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status, gpr_slice *optional_message); @@ -128,6 +131,9 @@ static void connectivity_state_set( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state, const char *reason); +static void check_read_ops(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global); + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -151,6 +157,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); GRPC_MDSTR_UNREF(t->parsing.str_grpc_timeout); + GRPC_MDELEM_UNREF(t->parsing.elem_grpc_status_ok); for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(t->lists[i].head == NULL); @@ -236,14 +243,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; t->global.is_client = is_client; - t->global.outgoing_window = DEFAULT_WINDOW; - t->global.incoming_window = DEFAULT_WINDOW; + t->writing.outgoing_window = DEFAULT_WINDOW; + t->parsing.incoming_window = DEFAULT_WINDOW; t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->global.ping_counter = 1; t->global.pings.next = t->global.pings.prev = &t->global.pings; t->parsing.is_client = is_client; t->parsing.str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); + t->parsing.elem_grpc_status_ok = + grpc_mdelem_from_strings(t->metadata_context, "grpc-status", "0"); t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; @@ -392,19 +401,46 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, } } +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global, + const char *reason) { + grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount, reason); +} +void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global, + const char *reason) { + grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount, + reason); +} +#else +void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global) { + grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount); +} +void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global) { + grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount); +} +#endif + static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, const void *server_data, - grpc_transport_stream_op *initial_op) { + grpc_stream *gs, grpc_stream_refcount *refcount, + const void *server_data) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; memset(s, 0, sizeof(*s)); - grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.incoming_metadata); - grpc_chttp2_incoming_metadata_buffer_init(&s->global.incoming_metadata); - grpc_sopb_init(&s->writing.sopb); - grpc_sopb_init(&s->global.incoming_sopb); + s->refcount = refcount; + GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2"); + + grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init( + &s->global.received_initial_metadata); + grpc_chttp2_incoming_metadata_buffer_init( + &s->global.received_trailing_metadata); grpc_chttp2_data_parser_init(&s->parsing.data_parser); + gpr_slice_buffer_init(&s->writing.flow_controlled_buffer); REF_TRANSPORT(t, "stream"); @@ -413,20 +449,17 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, if (server_data) { GPR_ASSERT(t->parsing_active); s->global.id = (gpr_uint32)(gpr_uintptr)server_data; + s->parsing.id = s->global.id; s->global.outgoing_window = t->global.settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->global.max_recv_bytes = s->parsing.incoming_window = - s->global.incoming_window = - t->global.settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->parsing.incoming_window = s->global.max_recv_bytes = + t->global.settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); s->global.in_stream_map = 1; } - - if (initial_op) - perform_stream_op_locked(exec_ctx, &t->global, &s->global, initial_op); unlock(exec_ctx, t); return 0; @@ -437,10 +470,13 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; int i; + grpc_byte_stream *bs; + + GPR_TIMER_BEGIN("destroy_stream", 0); gpr_mu_lock(&t->mu); - GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || + GPR_ASSERT((s->global.write_closed && s->global.read_closed) || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { @@ -451,8 +487,9 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->global.id) == NULL); } - grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global); grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); + grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, + &s->global); gpr_mu_unlock(&t->mu); @@ -464,17 +501,29 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } } - GPR_ASSERT(s->global.outgoing_sopb == NULL); - GPR_ASSERT(s->global.publish_sopb == NULL); - grpc_sopb_destroy(&s->writing.sopb); - grpc_sopb_destroy(&s->global.incoming_sopb); + while ( + (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { + grpc_byte_stream_destroy(bs); + } + + GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); + GPR_ASSERT(s->global.send_message_finished == NULL); + GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL); + GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL); + GPR_ASSERT(s->global.recv_message_ready == NULL); + GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); grpc_chttp2_data_parser_destroy(&s->parsing.data_parser); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.incoming_metadata); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.incoming_metadata); - grpc_chttp2_incoming_metadata_live_op_buffer_end( - &s->global.outstanding_metadata); + grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_destroy( + &s->global.received_initial_metadata); + grpc_chttp2_incoming_metadata_buffer_destroy( + &s->global.received_trailing_metadata); + gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); UNREF_TRANSPORT(exec_ctx, t, "stream"); + + GPR_TIMER_END("destroy_stream", 0); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -486,12 +535,14 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( - grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + gpr_uint32 id) { grpc_chttp2_stream *accepting; grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); GPR_ASSERT(t->accepting_stream == NULL); t->accepting_stream = &accepting; - t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, + t->channel_callback.accept_stream(exec_ctx, + t->channel_callback.accept_stream_user_data, &t->base, (void *)(gpr_uintptr)id); t->accepting_stream = NULL; return &accepting->parsing; @@ -511,7 +562,6 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { GPR_TIMER_BEGIN("unlock", 0); - unlock_check_read_write_state(exec_ctx, t); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; @@ -519,6 +569,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1); prevent_endpoint_shutdown(t); } + check_read_ops(exec_ctx, &t->global); gpr_mu_unlock(&t->mu); GPR_TIMER_END("unlock", 0); @@ -558,7 +609,6 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, drop_connection(exec_ctx, t); } - /* cleanup writing related jazz */ grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); /* leave the writing flag up on shutdown to prevent further writes in unlock() @@ -598,6 +648,7 @@ void grpc_chttp2_add_incoming_goaway( static void maybe_start_some_streams( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { grpc_chttp2_stream_global *stream_global; + gpr_uint32 stream_incoming_window; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID && @@ -607,13 +658,16 @@ static void maybe_start_some_streams( [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { + /* safe since we can't (legally) be parsing this stream yet */ + grpc_chttp2_stream_parsing *stream_parsing = + &STREAM_FROM_GLOBAL(stream_global)->parsing; GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", transport_global->is_client ? "CLI" : "SVR", stream_global, transport_global->next_stream_id)); GPR_ASSERT(stream_global->id == 0); - stream_global->id = transport_global->next_stream_id; + stream_global->id = stream_parsing->id = transport_global->next_stream_id; transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { @@ -625,103 +679,166 @@ static void maybe_start_some_streams( stream_global->outgoing_window = transport_global->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - stream_global->incoming_window = + stream_parsing->incoming_window = stream_incoming_window = transport_global->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; stream_global->max_recv_bytes = - GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes); + GPR_MAX(stream_incoming_window, stream_global->max_recv_bytes); grpc_chttp2_stream_map_add( &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = 1; transport_global->concurrent_stream_count++; - grpc_chttp2_list_add_incoming_window_updated(transport_global, - stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { - cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE); + cancel_from_api(exec_ctx, transport_global, stream_global, + GRPC_STATUS_UNAVAILABLE); + } +} + +static grpc_closure *add_closure_barrier(grpc_closure *closure) { + closure->final_data += 2; + return closure; +} + +void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, + grpc_closure **pclosure, int success) { + grpc_closure *closure = *pclosure; + if (closure == NULL) { + return; + } + closure->final_data -= 2; + if (!success) { + closure->final_data |= 1; + } + if (closure->final_data < 2) { + grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0); } + *pclosure = NULL; +} + +static int contains_non_ok_status( + grpc_chttp2_transport_global *transport_global, + grpc_metadata_batch *batch) { + grpc_mdelem *ok_elem = + TRANSPORT_FROM_GLOBAL(transport_global)->parsing.elem_grpc_status_ok; + grpc_linked_mdelem *l; + for (l = batch->list.head; l; l = l->next) { + if (l->md->key == ok_elem->key && l->md != ok_elem) { + return 1; + } + } + return 0; } static void perform_stream_op_locked( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { + grpc_closure *on_complete; + GPR_TIMER_BEGIN("perform_stream_op_locked", 0); + + on_complete = op->on_complete; + /* use final_data as a barrier until enqueue time; the inital counter is + dropped at the end of this function */ + on_complete->final_data = 2; + if (op->cancel_with_status != GRPC_STATUS_OK) { - cancel_from_api(transport_global, stream_global, op->cancel_with_status); + cancel_from_api(exec_ctx, transport_global, stream_global, + op->cancel_with_status); } if (op->close_with_status != GRPC_STATUS_OK) { - close_from_api(transport_global, stream_global, op->close_with_status, - op->optional_close_message); - } - - if (op->send_ops) { - GPR_ASSERT(stream_global->outgoing_sopb == NULL); - stream_global->send_done_closure = op->on_done_send; - if (!stream_global->cancelled) { - stream_global->written_anything = 1; - stream_global->outgoing_sopb = op->send_ops; - if (op->is_last_send && - stream_global->write_state == GRPC_WRITE_STATE_OPEN) { - stream_global->write_state = GRPC_WRITE_STATE_QUEUED_CLOSE; - } - if (stream_global->id == 0) { - GRPC_CHTTP2_IF_TRACING(gpr_log( - GPR_DEBUG, - "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", - transport_global->is_client ? "CLI" : "SVR", stream_global)); + close_from_api(exec_ctx, transport_global, stream_global, + op->close_with_status, op->optional_close_message); + } + + if (op->send_initial_metadata != NULL) { + GPR_ASSERT(stream_global->send_initial_metadata_finished == NULL); + stream_global->send_initial_metadata_finished = + add_closure_barrier(on_complete); + stream_global->send_initial_metadata = op->send_initial_metadata; + if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { + stream_global->seen_error = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (!stream_global->write_closed) { + if (transport_global->is_client) { + GPR_ASSERT(stream_global->id == 0); grpc_chttp2_list_add_waiting_for_concurrency(transport_global, stream_global); maybe_start_some_streams(exec_ctx, transport_global); - } else if (stream_global->outgoing_window > 0) { + } else { + GPR_ASSERT(stream_global->id != 0); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } else { - grpc_sopb_reset(op->send_ops); - grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 0); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_initial_metadata_finished, 0); } } - if (op->recv_ops) { - GPR_ASSERT(stream_global->publish_sopb == NULL); - GPR_ASSERT(stream_global->published_state != GRPC_STREAM_CLOSED); - stream_global->recv_done_closure = op->on_done_recv; - stream_global->publish_sopb = op->recv_ops; - stream_global->publish_sopb->nops = 0; - stream_global->publish_state = op->recv_state; - /* clamp max recv bytes */ - op->max_recv_bytes = GPR_MIN(op->max_recv_bytes, GPR_UINT32_MAX); - if (stream_global->max_recv_bytes < op->max_recv_bytes) { - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "op", transport_global, stream_global, max_recv_bytes, - op->max_recv_bytes - stream_global->max_recv_bytes); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "op", transport_global, stream_global, unannounced_incoming_window, - op->max_recv_bytes - stream_global->max_recv_bytes); - stream_global->unannounced_incoming_window += - (gpr_uint32)op->max_recv_bytes - stream_global->max_recv_bytes; - stream_global->max_recv_bytes = (gpr_uint32)op->max_recv_bytes; + if (op->send_message != NULL) { + GPR_ASSERT(stream_global->send_message_finished == NULL); + GPR_ASSERT(stream_global->send_message == NULL); + stream_global->send_message_finished = add_closure_barrier(on_complete); + if (stream_global->write_closed) { + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_message_finished, 0); + } else if (stream_global->id != 0) { + stream_global->send_message = op->send_message; + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } - grpc_chttp2_incoming_metadata_live_op_buffer_end( - &stream_global->outstanding_metadata); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - if (stream_global->id != 0) { + } + + if (op->send_trailing_metadata != NULL) { + GPR_ASSERT(stream_global->send_trailing_metadata_finished == NULL); + stream_global->send_trailing_metadata_finished = + add_closure_barrier(on_complete); + stream_global->send_trailing_metadata = op->send_trailing_metadata; + if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) { + stream_global->seen_error = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (stream_global->write_closed) { + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_trailing_metadata_finished, 0); + } else if (stream_global->id != 0) { + /* TODO(ctiller): check if there's flow control for any outstanding + bytes before going writable */ grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } - if (op->bind_pollset) { - add_to_pollset_locked(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), - op->bind_pollset); + if (op->recv_initial_metadata != NULL) { + GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL); + stream_global->recv_initial_metadata_finished = + add_closure_barrier(on_complete); + stream_global->recv_initial_metadata = op->recv_initial_metadata; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + if (op->recv_message != NULL) { + GPR_ASSERT(stream_global->recv_message_ready == NULL); + stream_global->recv_message_ready = op->recv_message_ready; + stream_global->recv_message = op->recv_message; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + + if (op->recv_trailing_metadata != NULL) { + GPR_ASSERT(stream_global->recv_trailing_metadata_finished == NULL); + stream_global->recv_trailing_metadata_finished = + add_closure_barrier(on_complete); + stream_global->recv_trailing_metadata = op->recv_trailing_metadata; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + + grpc_chttp2_complete_closure_step(exec_ctx, &on_complete, 1); + GPR_TIMER_END("perform_stream_op_locked", 0); } @@ -811,12 +928,49 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * INPUT PROCESSING */ -static grpc_stream_state compute_state(gpr_uint8 write_closed, - gpr_uint8 read_closed) { - if (write_closed && read_closed) return GRPC_STREAM_CLOSED; - if (write_closed) return GRPC_STREAM_SEND_CLOSED; - if (read_closed) return GRPC_STREAM_RECV_CLOSED; - return GRPC_STREAM_OPEN; +static void check_read_ops(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global) { + grpc_chttp2_stream_global *stream_global; + grpc_byte_stream *bs; + while ( + grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) { + if (stream_global->recv_initial_metadata_finished != NULL && + stream_global->published_initial_metadata) { + grpc_chttp2_incoming_metadata_buffer_publish( + &stream_global->received_initial_metadata, + stream_global->recv_initial_metadata); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->recv_initial_metadata_finished, 1); + } + if (stream_global->recv_message_ready != NULL) { + if (stream_global->incoming_frames.head != NULL) { + *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames); + GPR_ASSERT(*stream_global->recv_message != NULL); + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1); + stream_global->recv_message_ready = NULL; + } else if (stream_global->published_trailing_metadata) { + *stream_global->recv_message = NULL; + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1); + stream_global->recv_message_ready = NULL; + } + } + if (stream_global->recv_trailing_metadata_finished != NULL && + stream_global->read_closed && stream_global->write_closed) { + while (stream_global->seen_error && + (bs = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames)) != NULL) { + grpc_byte_stream_destroy(bs); + } + if (stream_global->incoming_frames.head == NULL) { + grpc_chttp2_incoming_metadata_buffer_publish( + &stream_global->received_trailing_metadata, + stream_global->recv_trailing_metadata); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->recv_trailing_metadata_finished, 1); + } + } + } } static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -832,7 +986,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->global.in_stream_map = 0; if (t->parsing.incoming_stream == &s->parsing) { t->parsing.incoming_stream = NULL; - grpc_chttp2_parsing_become_skip_parser(&t->parsing); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing); } if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { close_transport_locked(exec_ctx, t); @@ -847,109 +1001,10 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } -static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - grpc_chttp2_transport_global *transport_global = &t->global; - grpc_chttp2_stream_global *stream_global; - grpc_stream_state state; - - if (!t->parsing_active) { - /* if a stream is in the stream map, and gets cancelled, we need to ensure - we are not parsing before continuing the cancellation to keep things in - a sane state */ - while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, - &stream_global)) { - GPR_ASSERT(stream_global->in_stream_map); - GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN); - GPR_ASSERT(stream_global->read_closed); - remove_stream(exec_ctx, t, stream_global->id); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - } - } - - if (!t->writing_active) { - while (grpc_chttp2_list_pop_cancelled_waiting_for_writing(transport_global, - &stream_global)) { - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - } - } - - while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, - &stream_global)) { - if (stream_global->cancelled) { - if (t->writing_active && - stream_global->write_state != GRPC_WRITE_STATE_SENT_CLOSE) { - grpc_chttp2_list_add_cancelled_waiting_for_writing(transport_global, - stream_global); - } else { - stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; - if (stream_global->outgoing_sopb != NULL) { - grpc_sopb_reset(stream_global->outgoing_sopb); - stream_global->outgoing_sopb = NULL; - grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 1); - } - stream_global->read_closed = 1; - if (!stream_global->published_cancelled) { - char buffer[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(stream_global->cancelled_status, buffer); - grpc_chttp2_incoming_metadata_buffer_add( - &stream_global->incoming_metadata, - grpc_mdelem_from_strings(t->metadata_context, "grpc-status", - buffer)); - grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( - &stream_global->incoming_metadata, &stream_global->incoming_sopb); - stream_global->published_cancelled = 1; - } - } - } - if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE && - stream_global->read_closed && stream_global->in_stream_map) { - if (t->parsing_active) { - grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, - stream_global); - } else { - remove_stream(exec_ctx, t, stream_global->id); - } - } - if (!stream_global->publish_sopb) { - continue; - } - if (stream_global->writing_now != 0) { - continue; - } - /* FIXME(ctiller): we include in_stream_map in our computation of - whether the stream is write-closed. This is completely bogus, - but has the effect of delaying stream-closed until the stream - is indeed evicted from the stream map, making it safe to delete. - To fix this will require having an edge after stream-closed - indicating that the stream is closed AND safe to delete. */ - state = compute_state( - stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE && - !stream_global->in_stream_map, - stream_global->read_closed); - if (stream_global->incoming_sopb.nops == 0 && - state == stream_global->published_state) { - continue; - } - grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( - &stream_global->incoming_metadata, &stream_global->incoming_sopb, - &stream_global->outstanding_metadata); - grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); - stream_global->published_state = *stream_global->publish_state = state; - grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_done_closure, 1); - stream_global->recv_done_closure = NULL; - stream_global->publish_sopb = NULL; - stream_global->publish_state = NULL; - } -} - -static void cancel_from_api(grpc_chttp2_transport_global *transport_global, +static void cancel_from_api(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status) { - stream_global->cancelled = 1; - stream_global->cancelled_status = status; if (stream_global->id != 0) { gpr_slice_buffer_add( &transport_global->qbuf, @@ -957,11 +1012,89 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, stream_global->id, (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(status))); } - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, + NULL); + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, + 1); +} + +void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_status_code status, gpr_slice *slice) { + if (status != GRPC_STATUS_OK) { + stream_global->seen_error = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (stream_global->published_trailing_metadata && + stream_global->recv_trailing_metadata_finished != NULL) { + /* last chance replacement: we've received trailing metadata, + but something more important has become available to signal + to the upper layers - drop what we've got, and then publish + what we want - which is safe because we haven't told anyone + about the metadata yet */ + grpc_chttp2_incoming_metadata_buffer_reset( + &stream_global->received_trailing_metadata); + stream_global->published_trailing_metadata = 0; + } + if (!stream_global->published_trailing_metadata) { + grpc_mdctx *mdctx = + TRANSPORT_FROM_GLOBAL(transport_global)->metadata_context; + char status_string[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(status, status_string); + grpc_chttp2_incoming_metadata_buffer_add( + &stream_global->received_trailing_metadata, + grpc_mdelem_from_strings(mdctx, "grpc-status", status_string)); + if (slice) { + grpc_chttp2_incoming_metadata_buffer_add( + &stream_global->received_trailing_metadata, + grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_from_string(mdctx, "grpc-message"), + grpc_mdstr_from_slice(mdctx, gpr_slice_ref(*slice)))); + } + stream_global->published_trailing_metadata = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (slice) { + gpr_slice_unref(*slice); + } +} + +void grpc_chttp2_mark_stream_closed( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, int close_reads, + int close_writes) { + if (stream_global->read_closed && stream_global->write_closed) { + /* already closed */ + return; + } + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + if (close_reads && !stream_global->read_closed) { + stream_global->read_closed = 1; + stream_global->published_initial_metadata = 1; + stream_global->published_trailing_metadata = 1; + } + if (close_writes && !stream_global->write_closed) { + stream_global->write_closed = 1; + } + if (stream_global->read_closed && stream_global->write_closed) { + if (stream_global->id != 0 && + TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) { + grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, + stream_global); + } else { + if (stream_global->id != 0) { + remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), + stream_global->id); + } + stream_global->finished_close = 1; + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); + } + } } -static void close_from_api(grpc_chttp2_transport_global *transport_global, +static void close_from_api(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status, gpr_slice *optional_message) { @@ -973,10 +1106,7 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, GPR_ASSERT(status >= 0 && (int)status < 100); - stream_global->cancelled = 1; - stream_global->cancelled_status = status; GPR_ASSERT(stream_global->id != 0); - GPR_ASSERT(!stream_global->written_anything); /* Hand roll a header block. This is unnecessarily ugly - at some point we should find a more elegant @@ -1059,23 +1189,30 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, &transport_global->qbuf, grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR)); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); + if (optional_message) { + gpr_slice_ref(*optional_message); + } + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, + optional_message); + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, + 1); } static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { - cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE); + cancel_from_api(user_data, transport_global, stream_global, + GRPC_STATUS_UNAVAILABLE); } -static void end_all_the_calls(grpc_chttp2_transport *t) { - grpc_chttp2_for_all_streams(&t->global, NULL, cancel_stream_cb); +static void end_all_the_calls(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb); } static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { close_transport_locked(exec_ctx, t); - end_all_the_calls(t); + end_all_the_calls(exec_ctx, t); } /** update window from a settings change */ @@ -1086,12 +1223,11 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) { grpc_chttp2_stream_global *stream_global = &s->global; int was_zero; int is_zero; + gpr_int64 initial_window_update = t->parsing.initial_window_update; - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global, - outgoing_window, - t->parsing.initial_window_update); was_zero = stream_global->outgoing_window <= 0; - stream_global->outgoing_window += t->parsing.initial_window_update; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", transport_global, stream_global, + outgoing_window, initial_window_update); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { @@ -1112,6 +1248,9 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { size_t i; int keep_reading = 0; grpc_chttp2_transport *t = tp; + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; + grpc_chttp2_stream_global *stream_global; GPR_TIMER_BEGIN("recv_data", 0); @@ -1123,11 +1262,11 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); - grpc_chttp2_prepare_to_read(&t->global, &t->parsing); + grpc_chttp2_prepare_to_read(transport_global, transport_parsing); gpr_mu_unlock(&t->mu); GPR_TIMER_BEGIN("recv_data.parse", 0); for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(exec_ctx, &t->parsing, + grpc_chttp2_perform_read(exec_ctx, transport_parsing, t->read_buffer.slices[i]); i++) ; @@ -1139,16 +1278,28 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); - t->global.concurrent_stream_count = + transport_global->concurrent_stream_count = (gpr_uint32)grpc_chttp2_stream_map_size(&t->parsing_stream_map); - if (t->parsing.initial_window_update != 0) { + if (transport_parsing->initial_window_update != 0) { grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, update_global_window, t); - t->parsing.initial_window_update = 0; + transport_parsing->initial_window_update = 0; } /* handle higher level things */ - grpc_chttp2_publish_reads(exec_ctx, &t->global, &t->parsing); + grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); t->parsing_active = 0; + /* if a stream is in the stream map, and gets cancelled, we need to ensure + * we are not parsing before continuing the cancellation to keep things in + * a sane state */ + while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, + &stream_global)) { + GPR_ASSERT(stream_global->in_stream_map); + GPR_ASSERT(stream_global->write_closed); + GPR_ASSERT(stream_global->read_closed); + remove_stream(exec_ctx, t, stream_global->id); + stream_global->finished_close = 1; + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); + } } if (!success || i != t->read_buffer.count || t->closed) { drop_connection(exec_ctx, t); @@ -1206,35 +1357,216 @@ static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, } } +static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + lock(t); + add_to_pollset_locked(exec_ctx, t, pollset); + unlock(exec_ctx, t); +} + /* - * TRACING + * BYTE STREAM */ -void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, - const char *context, const char *var, - int is_client, gpr_uint32 stream_id, - gpr_int64 current_value, gpr_int64 delta) { - char *identifier; - char *context_scope; - char *context_thread; - char *underscore_pos = strchr(context, '_'); - GPR_ASSERT(underscore_pos); - context_thread = gpr_strdup(underscore_pos + 1); - context_scope = gpr_strdup(context); - context_scope[underscore_pos - context] = 0; - if (stream_id) { - gpr_asprintf(&identifier, "%s[%d]", context_scope, stream_id); +static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + gpr_slice *slice, size_t max_size_hint, + grpc_closure *on_complete) { + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + grpc_chttp2_transport_global *transport_global = &bs->transport->global; + grpc_chttp2_stream_global *stream_global = &bs->stream->global; + gpr_uint32 max_recv_bytes; + + lock(bs->transport); + if (bs->is_tail) { + /* clamp max recv hint to an allowable size */ + if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) { + max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD; + } else { + max_recv_bytes = (gpr_uint32)max_size_hint; + } + + /* account for bytes already received but unknown to higher layers */ + if (max_recv_bytes >= bs->slices.length) { + max_recv_bytes -= (gpr_uint32)bs->slices.length; + } else { + max_recv_bytes = 0; + } + /* add some small lookahead to keep pipelines flowing */ + GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD); + max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD; + if (stream_global->max_recv_bytes < max_recv_bytes) { + gpr_uint32 add_max_recv_bytes = + max_recv_bytes - stream_global->max_recv_bytes; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + max_recv_bytes, add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_parse, + add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_writing, + add_max_recv_bytes); + grpc_chttp2_list_add_unannounced_incoming_window_available( + transport_global, stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } + } + if (bs->slices.count > 0) { + *slice = gpr_slice_buffer_take_first(&bs->slices); + unlock(exec_ctx, bs->transport); + return 1; + } else { + bs->on_next = on_complete; + bs->next = slice; + unlock(exec_ctx, bs->transport); + return 0; + } +} + +static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { + if (gpr_unref(&bs->refs)) { + gpr_slice_buffer_destroy(&bs->slices); + gpr_free(bs); + } +} + +static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) { + incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream); +} + +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + gpr_slice slice) { + gpr_mu_lock(&bs->transport->mu); + if (bs->on_next != NULL) { + *bs->next = slice; + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1); + bs->on_next = NULL; + } else { + gpr_slice_buffer_add(&bs->slices, slice); + } + gpr_mu_unlock(&bs->transport->mu); +} + +void grpc_chttp2_incoming_byte_stream_finished( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { + incoming_byte_stream_unref(bs); +} + +grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( + grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, + gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) { + grpc_chttp2_incoming_byte_stream *incoming_byte_stream = + gpr_malloc(sizeof(*incoming_byte_stream)); + incoming_byte_stream->base.length = frame_size; + incoming_byte_stream->base.flags = flags; + incoming_byte_stream->base.next = incoming_byte_stream_next; + incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; + gpr_ref_init(&incoming_byte_stream->refs, 2); + incoming_byte_stream->next_message = NULL; + incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing); + incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing); + gpr_slice_buffer_init(&incoming_byte_stream->slices); + incoming_byte_stream->on_next = NULL; + incoming_byte_stream->is_tail = 1; + if (add_to_queue->head == NULL) { + add_to_queue->head = incoming_byte_stream; } else { - identifier = gpr_strdup(context_scope); - } - gpr_log(GPR_INFO, - "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]", - is_client ? "client" : "server", identifier, context_thread, var, - current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta, - current_value + delta, reason, file, line); - gpr_free(identifier); - gpr_free(context_thread); - gpr_free(context_scope); + add_to_queue->tail->is_tail = 0; + add_to_queue->tail->next_message = incoming_byte_stream; + } + add_to_queue->tail = incoming_byte_stream; + return incoming_byte_stream; +} + +/* + * TRACING + */ + +static char *format_flowctl_context_var(const char *context, const char *var, + gpr_int64 val, gpr_uint32 id, + char **scope) { + char *underscore_pos; + char *result; + if (context == NULL) { + *scope = NULL; + gpr_asprintf(&result, "%s(%lld)", var, val); + return result; + } + underscore_pos = strchr(context, '_'); + *scope = gpr_strdup(context); + (*scope)[underscore_pos - context] = 0; + if (id != 0) { + char *tmp = *scope; + gpr_asprintf(scope, "%s[%d]", tmp, id); + gpr_free(tmp); + } + gpr_asprintf(&result, "%s.%s(%lld)", underscore_pos + 1, var, val); + return result; +} + +static int samestr(char *a, char *b) { + if (a == NULL) { + return b == NULL; + } + if (b == NULL) { + return 0; + } + return 0 == strcmp(a, b); +} + +void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, + grpc_chttp2_flowctl_op op, const char *context1, + const char *var1, const char *context2, + const char *var2, int is_client, + gpr_uint32 stream_id, gpr_int64 val1, + gpr_int64 val2) { + char *scope1; + char *scope2; + char *label1 = + format_flowctl_context_var(context1, var1, val1, stream_id, &scope1); + char *label2 = + format_flowctl_context_var(context2, var2, val2, stream_id, &scope2); + char *clisvr = is_client ? "client" : "server"; + char *prefix; + + gpr_asprintf(&prefix, "FLOW % 8s: %s % 11s ", phase, clisvr, scope1); + + switch (op) { + case GRPC_CHTTP2_FLOWCTL_MOVE: + GPR_ASSERT(samestr(scope1, scope2)); + if (val2 != 0) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "%sMOVE % 40s <- % 40s giving %d", prefix, label1, label2, + val1 + val2); + } + break; + case GRPC_CHTTP2_FLOWCTL_CREDIT: + GPR_ASSERT(val2 >= 0); + if (val2 != 0) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "%sCREDIT % 40s by % 40s giving %d", prefix, label1, label2, + val1 + val2); + } + break; + case GRPC_CHTTP2_FLOWCTL_DEBIT: + GPR_ASSERT(val2 >= 0); + if (val2 != 0) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "%sDEBIT % 40s by % 40s giving %d", prefix, label1, label2, + val1 - val2); + } + break; + } + + gpr_free(scope1); + gpr_free(scope2); + gpr_free(label1); + gpr_free(label2); + gpr_free(prefix); } /* @@ -1246,7 +1578,7 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { } static const grpc_transport_vtable vtable = { - sizeof(grpc_chttp2_stream), init_stream, perform_stream_op, + sizeof(grpc_chttp2_stream), init_stream, set_pollset, perform_stream_op, perform_transport_op, destroy_stream, destroy_transport, chttp2_get_peer}; grpc_transport *grpc_create_chttp2_transport( |