diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.cc')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 841 |
1 files changed, 423 insertions, 418 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 02fc53122d..034e6ed8ca 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -99,94 +99,94 @@ grpc_tracer_flag grpc_trace_chttp2_refcount = #endif /* forward declarations of various callbacks that we'll build closures around */ -static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); -static void write_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); +static void write_action_begin_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); +static void write_action(grpc_exec_ctx* exec_ctx, void* t, grpc_error* error); +static void write_action_end_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); -static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); +static void read_action_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); -static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, - grpc_error *error); +static void complete_fetch_locked(grpc_exec_ctx* exec_ctx, void* gs, + grpc_error* error); /** Set a transport level setting, and push it to our peer */ -static void queue_setting_update(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void queue_setting_update(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_setting_id id, uint32_t value); -static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_error *error); +static void close_from_api(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_error* error); /** Start new streams that have been created if we can */ -static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); +static void maybe_start_some_streams(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); -static void connectivity_state_set(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void connectivity_state_set(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_connectivity_state state, - grpc_error *error, const char *reason); + grpc_error* error, const char* reason); -static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - void *byte_stream, - grpc_error *error_ignored); +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx, + void* byte_stream, + grpc_error* error_ignored); static void incoming_byte_stream_publish_error( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error); -static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs); - -static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); -static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); - -static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); -static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); - -static void close_transport_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error); -static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error); - -static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); -static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error); -static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error); -static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, - void *tp, grpc_error *error); - -static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error); -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_closure *on_initiate, - grpc_closure *on_complete); -static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error); + grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs, + grpc_error* error); +static void incoming_byte_stream_unref(grpc_exec_ctx* exec_ctx, + grpc_chttp2_incoming_byte_stream* bs); + +static void benign_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); +static void destructive_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); + +static void post_benign_reclaimer(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static void post_destructive_reclaimer(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); + +static void close_transport_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_error* error); +static void end_all_the_calls(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error); + +static void schedule_bdp_ping_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static void start_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error); +static void finish_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx* exec_ctx, + void* tp, grpc_error* error); + +static void cancel_pings(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error); +static void send_ping_locked(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_closure* on_initiate, + grpc_closure* on_complete); +static void retry_initiate_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error); /** keepalive-relevant functions */ -static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - -static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void init_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); +static void start_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); +static void finish_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); +static void keepalive_watchdog_fired_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); + +static void reset_byte_stream(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ -static void destruct_transport(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void destruct_transport(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { size_t i; grpc_endpoint_destroy(exec_ctx, t->ep); @@ -216,7 +216,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); while (t->write_cb_pool) { - grpc_chttp2_write_cb *next = t->write_cb_pool->next; + grpc_chttp2_write_cb* next = t->write_cb_pool->next; gpr_free(t->write_cb_pool); t->write_cb_pool = next; } @@ -230,9 +230,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, } #ifndef NDEBUG -void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, const char *reason, - const char *file, int line) { +void grpc_chttp2_unref_transport(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, const char* reason, + const char* file, int line) { if (GRPC_TRACER_ON(grpc_trace_chttp2_refcount)) { gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count); gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", @@ -242,8 +242,8 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, destruct_transport(exec_ctx, t); } -void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason, - const char *file, int line) { +void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason, + const char* file, int line) { if (GRPC_TRACER_ON(grpc_trace_chttp2_refcount)) { gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count); gpr_log(GPR_DEBUG, "chttp2: ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", @@ -252,20 +252,20 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason, gpr_ref(&t->refs); } #else -void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +void grpc_chttp2_unref_transport(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { if (!gpr_unref(&t->refs)) return; destruct_transport(exec_ctx, t); } -void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } +void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } #endif -static const grpc_transport_vtable *get_vtable(void); +static const grpc_transport_vtable* get_vtable(void); -static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - const grpc_channel_args *channel_args, - grpc_endpoint *ep, bool is_client) { +static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + const grpc_channel_args* channel_args, + grpc_endpoint* ep, bool is_client) { size_t i; int j; @@ -494,7 +494,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } else { static const struct { - const char *channel_arg_name; + const char* channel_arg_name; grpc_chttp2_setting_id setting_id; grpc_integer_options integer_options; bool availability[2] /* server, client */; @@ -580,9 +580,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, post_benign_reclaimer(exec_ctx, t); } -static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; +static void destroy_transport_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; t->destroying = 1; close_transport_locked( exec_ctx, t, @@ -592,17 +592,17 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy"); } -static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; +static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(destroy_transport_locked, t, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); } -static void close_transport_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_error *error) { +static void close_transport_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_error* error) { end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); if (t->closed_with_error == GRPC_ERROR_NONE) { @@ -645,7 +645,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, } /* flush writable stream list to avoid dangling references */ - grpc_chttp2_stream *s; + grpc_chttp2_stream* s; while (grpc_chttp2_list_pop_writable_stream(t, &s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } @@ -656,28 +656,28 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, } #ifndef NDEBUG -void grpc_chttp2_stream_ref(grpc_chttp2_stream *s, const char *reason) { +void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) { grpc_stream_ref(s->refcount, reason); } -void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s, - const char *reason) { +void grpc_chttp2_stream_unref(grpc_exec_ctx* exec_ctx, grpc_chttp2_stream* s, + const char* reason) { grpc_stream_unref(exec_ctx, s->refcount, reason); } #else -void grpc_chttp2_stream_ref(grpc_chttp2_stream *s) { +void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) { grpc_stream_ref(s->refcount); } -void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { +void grpc_chttp2_stream_unref(grpc_exec_ctx* exec_ctx, grpc_chttp2_stream* s) { grpc_stream_unref(exec_ctx, s->refcount); } #endif -static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data, gpr_arena *arena) { +static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) { GPR_TIMER_BEGIN("init_stream", 0); - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; + grpc_chttp2_stream* s = (grpc_chttp2_stream*)gs; s->t = t; s->refcount = refcount; @@ -717,10 +717,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, return 0; } -static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, - grpc_error *error) { - grpc_chttp2_stream *s = (grpc_chttp2_stream *)sp; - grpc_chttp2_transport *t = s->t; +static void destroy_stream_locked(grpc_exec_ctx* exec_ctx, void* sp, + grpc_error* error) { + grpc_chttp2_stream* s = (grpc_chttp2_stream*)sp; + grpc_chttp2_transport* t = s->t; GPR_TIMER_BEGIN("destroy_stream", 0); @@ -771,12 +771,12 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_CLOSURE_SCHED(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, - grpc_closure *then_schedule_closure) { +static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, + grpc_closure* then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; + grpc_chttp2_stream* s = (grpc_chttp2_stream*)gs; if (s->stream_compression_ctx != NULL) { grpc_stream_compression_context_destroy(s->stream_compression_ctx); @@ -789,29 +789,30 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->destroy_stream_arg = then_schedule_closure; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, - grpc_combiner_scheduler(t->combiner)), + exec_ctx, + GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, + grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("destroy_stream", 0); } -grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t, +grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport* t, uint32_t id) { - return (grpc_chttp2_stream *)grpc_chttp2_stream_map_find(&t->stream_map, id); + return (grpc_chttp2_stream*)grpc_chttp2_stream_map_find(&t->stream_map, id); } -grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, uint32_t id) { if (t->channel_callback.accept_stream == NULL) { return NULL; } - grpc_chttp2_stream *accepting; + grpc_chttp2_stream* accepting; GPR_ASSERT(t->accepting_stream == NULL); t->accepting_stream = &accepting; t->channel_callback.accept_stream(exec_ctx, t->channel_callback.accept_stream_user_data, - &t->base, (void *)(uintptr_t)id); + &t->base, (void*)(uintptr_t)id); t->accepting_stream = NULL; return accepting; } @@ -820,7 +821,7 @@ grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, * OUTPUT PROCESSING */ -static const char *write_state_name(grpc_chttp2_write_state st) { +static const char* write_state_name(grpc_chttp2_write_state st) { switch (st) { case GRPC_CHTTP2_WRITE_STATE_IDLE: return "IDLE"; @@ -832,8 +833,8 @@ static const char *write_state_name(grpc_chttp2_write_state st) { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_write_state st, const char *reason) { +static void set_write_state(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_write_state st, const char* reason) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t, t->is_client ? "CLIENT" : "SERVER", write_state_name(t->write_state), @@ -842,7 +843,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { GRPC_CLOSURE_LIST_SCHED(exec_ctx, &t->run_after_write); if (t->close_transport_on_writes_finished != NULL) { - grpc_error *err = t->close_transport_on_writes_finished; + grpc_error* err = t->close_transport_on_writes_finished; t->close_transport_on_writes_finished = NULL; close_transport_locked(exec_ctx, t, err); } @@ -850,7 +851,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } static void inc_initiate_write_reason( - grpc_exec_ctx *exec_ctx, grpc_chttp2_initiate_write_reason reason) { + grpc_exec_ctx* exec_ctx, grpc_chttp2_initiate_write_reason reason) { switch (reason) { case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx); @@ -921,8 +922,8 @@ static void inc_initiate_write_reason( } } -void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_initiate_write(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_initiate_write_reason reason) { GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); @@ -950,16 +951,16 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { +void grpc_chttp2_mark_stream_writable(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { if (t->closed_with_error == GRPC_ERROR_NONE && grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } } -static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, +static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t, bool early_results_scheduled, bool partial_write) { /* if it's not the first write in a batch, always offload to the executor: @@ -987,7 +988,7 @@ static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, } #define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i)) -static const char *begin_writing_desc(bool partial, bool inlined) { +static const char* begin_writing_desc(bool partial, bool inlined) { switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) { case WRITE_STATE_TUPLE_TO_INT(false, false): return "begin write in background"; @@ -1001,10 +1002,10 @@ static const char *begin_writing_desc(bool partial, bool inlined) { GPR_UNREACHABLE_CODE(return "bad state tuple"); } -static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, - grpc_error *error_ignored) { +static void write_action_begin_locked(grpc_exec_ctx* exec_ctx, void* gt, + grpc_error* error_ignored) { GPR_TIMER_BEGIN("write_action_begin_locked", 0); - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; if (t->closed_with_error != GRPC_ERROR_NONE) { @@ -1019,18 +1020,20 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, if (!t->is_first_write_in_batch) { GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx); } - grpc_closure_scheduler *scheduler = + grpc_closure_scheduler* scheduler = write_scheduler(t, r.early_results_scheduled, r.partial); if (scheduler != grpc_schedule_on_exec_ctx) { GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx); } set_write_state( - exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE - : GRPC_CHTTP2_WRITE_STATE_WRITING, + exec_ctx, t, + r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE + : GRPC_CHTTP2_WRITE_STATE_WRITING, begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx)); - GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action, - write_action, t, scheduler), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler), + GRPC_ERROR_NONE); } else { GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(exec_ctx); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, @@ -1040,8 +1043,8 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, GPR_TIMER_END("write_action_begin_locked", 0); } -static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; +static void write_action(grpc_exec_ctx* exec_ctx, void* gt, grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; GPR_TIMER_BEGIN("write_action", 0); grpc_endpoint_write( exec_ctx, t->ep, &t->outbuf, @@ -1050,10 +1053,10 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { GPR_TIMER_END("write_action", 0); } -static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { +static void write_action_end_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { GPR_TIMER_BEGIN("terminate_writing_with_lock", 0); - grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; + grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; if (error != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); @@ -1098,10 +1101,10 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, // Dirties an HTTP2 setting to be sent out next time a writing path occurs. // If the change needs to occur immediately, manually initiate a write. -static void queue_setting_update(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void queue_setting_update(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_setting_id id, uint32_t value) { - const grpc_chttp2_setting_parameters *sp = + const grpc_chttp2_setting_parameters* sp = &grpc_chttp2_settings_parameters[id]; uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); if (use_value != value) { @@ -1114,8 +1117,8 @@ static void queue_setting_update(grpc_exec_ctx *exec_ctx, } } -void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, uint32_t goaway_error, grpc_slice goaway_text) { // GRPC_CHTTP2_IF_TRACING( @@ -1151,9 +1154,9 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, "got_goaway"); } -static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - grpc_chttp2_stream *s; +static void maybe_start_some_streams(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { + grpc_chttp2_stream* s; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && @@ -1204,24 +1207,24 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, bits being used for flags defined above) */ #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) -static grpc_closure *add_closure_barrier(grpc_closure *closure) { +static grpc_closure* add_closure_barrier(grpc_closure* closure) { closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT; return closure; } -static void null_then_run_closure(grpc_exec_ctx *exec_ctx, - grpc_closure **closure, grpc_error *error) { - grpc_closure *c = *closure; +static void null_then_run_closure(grpc_exec_ctx* exec_ctx, + grpc_closure** closure, grpc_error* error) { + grpc_closure* c = *closure; *closure = NULL; GRPC_CLOSURE_RUN(exec_ctx, c, error); } -void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - grpc_closure **pclosure, - grpc_error *error, const char *desc) { - grpc_closure *closure = *pclosure; +void grpc_chttp2_complete_closure_step(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s, + grpc_closure** pclosure, + grpc_error* error, const char* desc) { + grpc_closure* closure = *pclosure; *pclosure = NULL; if (closure == NULL) { GRPC_ERROR_UNREF(error); @@ -1229,7 +1232,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, } closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; if (GRPC_TRACER_ON(grpc_http_trace)) { - const char *errstr = grpc_error_string(error); + const char* errstr = grpc_error_string(error); gpr_log( GPR_DEBUG, "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s " @@ -1265,7 +1268,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, } } -static bool contains_non_ok_status(grpc_metadata_batch *batch) { +static bool contains_non_ok_status(grpc_metadata_batch* batch) { if (batch->idx.named.grpc_status != NULL) { return !grpc_mdelem_eq(batch->idx.named.grpc_status->md, GRPC_MDELEM_GRPC_STATUS_0); @@ -1273,9 +1276,9 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) { return false; } -static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { +static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); @@ -1284,18 +1287,18 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, } } -static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { +static void add_fetched_slice_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { s->fetched_send_message_length += (uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice); grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); maybe_become_writable_due_to_send_msg(exec_ctx, t, s); } -static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { +static void continue_fetching_send_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { for (;;) { if (s->fetching_send_message == NULL) { /* Stream was cancelled before message fetch completed */ @@ -1310,16 +1313,16 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, "fetching_send_message_finished"); } else { - grpc_chttp2_write_cb *cb = t->write_cb_pool; + grpc_chttp2_write_cb* cb = t->write_cb_pool; if (cb == NULL) { - cb = (grpc_chttp2_write_cb *)gpr_malloc(sizeof(*cb)); + cb = (grpc_chttp2_write_cb*)gpr_malloc(sizeof(*cb)); } else { t->write_cb_pool = cb->next; } cb->call_at_byte = notify_offset; cb->closure = s->fetching_send_message_finished; s->fetching_send_message_finished = NULL; - grpc_chttp2_write_cb **list = + grpc_chttp2_write_cb** list = s->fetching_send_message->flags & GRPC_WRITE_THROUGH ? &s->on_write_finished_cbs : &s->on_flow_controlled_cbs; @@ -1330,7 +1333,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, UINT32_MAX, &s->complete_fetch_locked)) { - grpc_error *error = grpc_byte_stream_pull( + grpc_error* error = grpc_byte_stream_pull( exec_ctx, s->fetching_send_message, &s->fetching_slice); if (error != GRPC_ERROR_NONE) { grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); @@ -1342,10 +1345,10 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, } } -static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, - grpc_error *error) { - grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - grpc_chttp2_transport *t = s->t; +static void complete_fetch_locked(grpc_exec_ctx* exec_ctx, void* gs, + grpc_error* error) { + grpc_chttp2_stream* s = (grpc_chttp2_stream*)gs; + grpc_chttp2_transport* t = s->t; if (error == GRPC_ERROR_NONE) { error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, &s->fetching_slice); @@ -1360,14 +1363,14 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, } } -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} -static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, +static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, bool is_client, bool is_initial) { - for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL; + for (grpc_linked_mdelem* md = md_batch->list.head; md != NULL; md = md->next) { - char *key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); - char *value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md)); + char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); + char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md)); gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL", is_client ? "CLI" : "SVR", key, value); gpr_free(key); @@ -1375,20 +1378,20 @@ static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, } } -static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, - grpc_error *error_ignored) { +static void perform_stream_op_locked(grpc_exec_ctx* exec_ctx, void* stream_op, + grpc_error* error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); - grpc_transport_stream_op_batch *op = - (grpc_transport_stream_op_batch *)stream_op; - grpc_chttp2_stream *s = (grpc_chttp2_stream *)op->handler_private.extra_arg; - grpc_transport_stream_op_batch_payload *op_payload = op->payload; - grpc_chttp2_transport *t = s->t; + grpc_transport_stream_op_batch* op = + (grpc_transport_stream_op_batch*)stream_op; + grpc_chttp2_stream* s = (grpc_chttp2_stream*)op->handler_private.extra_arg; + grpc_transport_stream_op_batch_payload* op_payload = op->payload; + grpc_chttp2_transport* t = s->t; GRPC_STATS_INC_HTTP2_OP_BATCHES(exec_ctx); if (GRPC_TRACER_ON(grpc_http_trace)) { - char *str = grpc_transport_stream_op_batch_string(op); + char* str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); gpr_free(str); @@ -1402,7 +1405,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } - grpc_closure *on_complete = op->on_complete; + grpc_closure* on_complete = op->on_complete; if (on_complete == NULL) { on_complete = GRPC_CLOSURE_CREATE(do_nothing, NULL, grpc_schedule_on_exec_ctx); @@ -1530,7 +1533,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, "fetching_send_message_finished"); } else { GPR_ASSERT(s->fetching_send_message == NULL); - uint8_t *frame_hdr = grpc_slice_buffer_tiny_add( + uint8_t* frame_hdr = grpc_slice_buffer_tiny_add( &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); uint32_t flags = op_payload->send_message.send_message->flags; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; @@ -1657,12 +1660,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "perform_stream_op"); } -static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, - grpc_transport_stream_op_batch *op) { +static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, + grpc_transport_stream_op_batch* op) { GPR_TIMER_BEGIN("perform_stream_op", 0); - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; + grpc_chttp2_stream* s = (grpc_chttp2_stream*)gs; if (!t->is_client) { if (op->send_initial_metadata) { @@ -1678,7 +1681,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } if (GRPC_TRACER_ON(grpc_http_trace)) { - char *str = grpc_transport_stream_op_batch_string(op); + char* str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op[s=%p]: %s", s, str); gpr_free(str); } @@ -1693,11 +1696,11 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_TIMER_END("perform_stream_op", 0); } -static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error) { +static void cancel_pings(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error) { /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ - grpc_chttp2_ping_queue *pq = &t->ping_queue; + grpc_chttp2_ping_queue* pq = &t->ping_queue; GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); @@ -1706,24 +1709,24 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_closure *on_initiate, grpc_closure *on_ack) { +static void send_ping_locked(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_closure* on_initiate, grpc_closure* on_ack) { if (t->closed_with_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, GRPC_ERROR_REF(t->closed_with_error)); GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error)); return; } - grpc_chttp2_ping_queue *pq = &t->ping_queue; + grpc_chttp2_ping_queue* pq = &t->ping_queue; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, GRPC_ERROR_NONE); } -static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; +static void retry_initiate_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; t->ping_state.is_delayed_ping_timer_set = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_initiate_write(exec_ctx, t, @@ -1731,11 +1734,11 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, } } -void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, +void grpc_chttp2_ack_ping(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, uint64_t id) { - grpc_chttp2_ping_queue *pq = &t->ping_queue; + grpc_chttp2_ping_queue* pq = &t->ping_queue; if (pq->inflight_id != id) { - char *from = grpc_endpoint_get_peer(t->ep); + char* from = grpc_endpoint_get_peer(t->ep); gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id); gpr_free(from); return; @@ -1747,8 +1750,8 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } -static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error) { +static void send_goaway(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error) { t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; grpc_http2_error_code http_error; grpc_slice slice; @@ -1761,8 +1764,8 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +void grpc_chttp2_add_ping_strike(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { t->ping_recv_state.ping_strikes++; if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes && t->ping_policy.max_ping_strikes != 0) { @@ -1772,19 +1775,20 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); /*The transport will be closed after the write is done */ close_transport_locked( - exec_ctx, t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + exec_ctx, t, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } -static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, - void *stream_op, - grpc_error *error_ignored) { - grpc_transport_op *op = (grpc_transport_op *)stream_op; - grpc_chttp2_transport *t = - (grpc_chttp2_transport *)op->handler_private.extra_arg; - grpc_error *close_transport = op->disconnect_with_error; +static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx, + void* stream_op, + grpc_error* error_ignored) { + grpc_transport_op* op = (grpc_transport_op*)stream_op; + grpc_chttp2_transport* t = + (grpc_chttp2_transport*)op->handler_private.extra_arg; + grpc_error* close_transport = op->disconnect_with_error; if (op->goaway_error) { send_goaway(exec_ctx, t, op->goaway_error); @@ -1825,10 +1829,10 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "transport_op"); } -static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - char *msg = grpc_transport_op_string(op); +static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_transport_op* op) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; + char* msg = grpc_transport_op_string(op); gpr_free(msg); op->handler_private.extra_arg = gt; GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); @@ -1843,9 +1847,9 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * INPUT PROCESSING - GENERAL */ -void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { +void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { if (s->seen_error) { @@ -1862,10 +1866,10 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, } } -void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - grpc_error *error = GRPC_ERROR_NONE; +void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { + grpc_error* error = GRPC_ERROR_NONE; if (s->recv_message_ready != NULL) { *s->recv_message = NULL; if (s->final_metadata_requested && s->seen_error) { @@ -1946,9 +1950,9 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, } } -void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { +void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); if (s->recv_trailing_metadata_finished != NULL && s->read_closed && s->write_closed) { @@ -2000,10 +2004,10 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } } -static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - uint32_t id, grpc_error *error) { - grpc_chttp2_stream *s = - (grpc_chttp2_stream *)grpc_chttp2_stream_map_delete(&t->stream_map, id); +static void remove_stream(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + uint32_t id, grpc_error* error) { + grpc_chttp2_stream* s = + (grpc_chttp2_stream*)grpc_chttp2_stream_map_delete(&t->stream_map, id); GPR_ASSERT(s); if (t->incoming_stream == s) { t->incoming_stream = NULL; @@ -2011,7 +2015,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } if (s->pending_byte_stream) { if (s->on_next != NULL) { - grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame; + grpc_chttp2_incoming_byte_stream* bs = s->data_parser.parsing_frame; if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } @@ -2042,9 +2046,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, maybe_start_some_streams(exec_ctx, t); } -void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_error *due_to_error) { +void grpc_chttp2_cancel_stream(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s, + grpc_error* due_to_error) { if (!t->is_client && !s->sent_trailing_metadata && grpc_error_has_clear_grpc_status(due_to_error)) { close_from_api(exec_ctx, t, s, due_to_error); @@ -2069,8 +2073,8 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, due_to_error); } -void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_error *error) { +void grpc_chttp2_fake_status(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_error* error) { grpc_status_code status; grpc_slice slice; grpc_error_get_status(exec_ctx, error, s->deadline, &status, &slice, NULL); @@ -2109,7 +2113,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) { +static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) { if (error == GRPC_ERROR_NONE) return; for (size_t i = 0; i < *nrefs; i++) { if (error == refs[i]) { @@ -2120,14 +2124,14 @@ static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) { ++*nrefs; } -static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, - const char *master_error_msg) { - grpc_error *refs[3]; +static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s, + const char* master_error_msg) { + grpc_error* refs[3]; size_t nrefs = 0; add_error(s->read_closed_error, refs, &nrefs); add_error(s->write_closed_error, refs, &nrefs); add_error(extra_error, refs, &nrefs); - grpc_error *error = GRPC_ERROR_NONE; + grpc_error* error = GRPC_ERROR_NONE; if (nrefs > 0) { error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg, refs, nrefs); @@ -2136,11 +2140,11 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } -static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_chttp2_write_cb **list, - grpc_error *error) { +static void flush_write_list(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_chttp2_write_cb** list, + grpc_error* error) { while (*list) { - grpc_chttp2_write_cb *cb = *list; + grpc_chttp2_write_cb* cb = *list; *list = cb->next; grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, GRPC_ERROR_REF(error), @@ -2151,9 +2155,9 @@ static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_error *error) { +void grpc_chttp2_fail_pending_writes(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_error* error) { error = removal_error(error, s, "Pending writes failed due to stream closure"); s->send_initial_metadata = NULL; @@ -2175,10 +2179,10 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error); } -void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, int close_reads, - int close_writes, grpc_error *error) { +void grpc_chttp2_mark_stream_closed(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, + grpc_chttp2_stream* s, int close_reads, + int close_writes, grpc_error* error) { if (s->read_closed && s->write_closed) { /* already closed */ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); @@ -2199,7 +2203,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, } if (s->read_closed && s->write_closed) { became_closed = true; - grpc_error *overall_error = + grpc_error* overall_error = removal_error(GRPC_ERROR_REF(error), s, "Stream removed"); if (s->id != 0) { remove_stream(exec_ctx, t, s->id, GRPC_ERROR_REF(overall_error)); @@ -2227,14 +2231,14 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_error *error) { +static void close_from_api(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_error* error) { grpc_slice hdr; grpc_slice status_hdr; grpc_slice http_status_hdr; grpc_slice content_type_hdr; grpc_slice message_pfx; - uint8_t *p; + uint8_t* p; uint32_t len = 0; grpc_status_code grpc_status; grpc_slice slice; @@ -2387,20 +2391,20 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } typedef struct { - grpc_exec_ctx *exec_ctx; - grpc_error *error; - grpc_chttp2_transport *t; + grpc_exec_ctx* exec_ctx; + grpc_error* error; + grpc_chttp2_transport* t; } cancel_stream_cb_args; -static void cancel_stream_cb(void *user_data, uint32_t key, void *stream) { - cancel_stream_cb_args *args = (cancel_stream_cb_args *)user_data; - grpc_chttp2_stream *s = (grpc_chttp2_stream *)stream; +static void cancel_stream_cb(void* user_data, uint32_t key, void* stream) { + cancel_stream_cb_args* args = (cancel_stream_cb_args*)user_data; + grpc_chttp2_stream* s = (grpc_chttp2_stream*)stream; grpc_chttp2_cancel_stream(args->exec_ctx, args->t, s, GRPC_ERROR_REF(args->error)); } -static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error) { +static void end_all_the_calls(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error) { cancel_stream_cb_args args = {exec_ctx, error, t}; grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args); GRPC_ERROR_UNREF(error); @@ -2411,7 +2415,7 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, */ template <class F> -static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, +static void WithUrgency(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_core::chttp2::FlowControlAction::Urgency urgency, grpc_chttp2_initiate_write_reason reason, F action) { switch (urgency) { @@ -2427,8 +2431,8 @@ static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } void grpc_chttp2_act_on_flowctl_action( - grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action, - grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + grpc_exec_ctx* exec_ctx, const grpc_core::chttp2::FlowControlAction& action, + grpc_chttp2_transport* t, grpc_chttp2_stream* s) { WithUrgency( exec_ctx, t, action.send_stream_update(), GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, @@ -2450,17 +2454,17 @@ void grpc_chttp2_act_on_flowctl_action( }); } -static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static grpc_error* try_http_parsing(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { grpc_http_parser parser; size_t i = 0; - grpc_error *error = GRPC_ERROR_NONE; + grpc_error* error = GRPC_ERROR_NONE; grpc_http_response response; memset(&response, 0, sizeof(response)); grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response); - grpc_error *parse_error = GRPC_ERROR_NONE; + grpc_error* parse_error = GRPC_ERROR_NONE; for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) { parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i], NULL); @@ -2480,27 +2484,27 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, return error; } -static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { +static void read_action_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { GPR_TIMER_BEGIN("reading_action_locked", 0); - grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; + grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; GRPC_ERROR_REF(error); - grpc_error *err = error; + grpc_error* err = error; if (err != GRPC_ERROR_NONE) { err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Endpoint read failed", &err, 1), GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state); } - GPR_SWAP(grpc_error *, err, error); + GPR_SWAP(grpc_error*, err, error); GRPC_ERROR_UNREF(err); if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; - grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, + grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { t->flow_control->bdp_estimator()->AddIncomingBytes( @@ -2522,7 +2526,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("post_parse_locked", 0); if (t->initial_window_update != 0) { if (t->initial_window_update > 0) { - grpc_chttp2_stream *s; + grpc_chttp2_stream* s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); grpc_chttp2_initiate_write( @@ -2569,16 +2573,16 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, // t is reffed prior to calling the first time, and once the callback chain // that kicks off finishes, it's unreffed -static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void schedule_bdp_ping_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { t->flow_control->bdp_estimator()->SchedulePing(); send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); } -static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; +static void start_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string, grpc_error_string(error)); @@ -2590,9 +2594,9 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, t->flow_control->bdp_estimator()->StartPing(); } -static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; +static void finish_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); @@ -2611,9 +2615,9 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, &t->next_bdp_ping_timer_expired_locked); } -static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, - void *tp, grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx* exec_ctx, + void* tp, grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { @@ -2623,7 +2627,7 @@ static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, schedule_bdp_ping_locked(exec_ctx, t); } -void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, +void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, bool is_client) { size_t i; if (args) { @@ -2681,9 +2685,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, } } -static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; +static void init_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; @@ -2712,18 +2716,18 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); } -static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; +static void start_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(exec_ctx, &t->keepalive_watchdog_timer, grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, &t->keepalive_watchdog_fired_locked); } -static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; +static void finish_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; @@ -2737,9 +2741,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); } -static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; +static void keepalive_watchdog_fired_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; @@ -2764,10 +2768,10 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, * CALLBACK LOOP */ -static void connectivity_state_set(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void connectivity_state_set(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_connectivity_state state, - grpc_error *error, const char *reason) { + grpc_error* error, const char* reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set(exec_ctx, &t->channel_callback.state_tracker, @@ -2778,15 +2782,15 @@ static void connectivity_state_set(grpc_exec_ctx *exec_ctx, * POLLSET STUFF */ -static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; +static void set_pollset(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_pollset* pollset) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); } -static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset_set *pollset_set) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; +static void set_pollset_set(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_pollset_set* pollset_set) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); } @@ -2794,9 +2798,9 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * BYTE STREAM */ -static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg; +static void reset_byte_stream(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_chttp2_stream* s = (grpc_chttp2_stream*)arg; s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { @@ -2813,20 +2817,20 @@ static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, } } -static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs) { +static void incoming_byte_stream_unref(grpc_exec_ctx* exec_ctx, + grpc_chttp2_incoming_byte_stream* bs) { if (gpr_unref(&bs->refs)) { gpr_free(bs); } } -static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, - void *argp, - grpc_error *error_ignored) { - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)argp; - grpc_chttp2_transport *t = bs->transport; - grpc_chttp2_stream *s = bs->stream; +static void incoming_byte_stream_next_locked(grpc_exec_ctx* exec_ctx, + void* argp, + grpc_error* error_ignored) { + grpc_chttp2_incoming_byte_stream* bs = + (grpc_chttp2_incoming_byte_stream*)argp; + grpc_chttp2_transport* t = bs->transport; + grpc_chttp2_stream* s = bs->stream; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { @@ -2868,14 +2872,14 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, incoming_byte_stream_unref(exec_ctx, bs); } -static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, +static bool incoming_byte_stream_next(grpc_exec_ctx* exec_ctx, + grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure *on_complete) { + grpc_closure* on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_chttp2_stream *s = bs->stream; + grpc_chttp2_incoming_byte_stream* bs = + (grpc_chttp2_incoming_byte_stream*)byte_stream; + grpc_chttp2_stream* s = bs->stream; if (s->unprocessed_incoming_frames_buffer.length > 0) { GPR_TIMER_END("incoming_byte_stream_next", 0); return true; @@ -2894,14 +2898,14 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, } } -static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_slice *slice) { +static grpc_error* incoming_byte_stream_pull(grpc_exec_ctx* exec_ctx, + grpc_byte_stream* byte_stream, + grpc_slice* slice) { GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0); - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_chttp2_stream *s = bs->stream; - grpc_error *error; + grpc_chttp2_incoming_byte_stream* bs = + (grpc_chttp2_incoming_byte_stream*)byte_stream; + grpc_chttp2_stream* s = bs->stream; + grpc_error* error; if (s->unprocessed_incoming_frames_buffer.length > 0) { if (!s->unprocessed_incoming_frames_decompressed) { @@ -2945,27 +2949,28 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - void *byte_stream, - grpc_error *error_ignored); +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx, + void* byte_stream, + grpc_error* error_ignored); -static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream) { +static void incoming_byte_stream_destroy(grpc_exec_ctx* exec_ctx, + grpc_byte_stream* byte_stream) { GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0); - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; + grpc_chttp2_incoming_byte_stream* bs = + (grpc_chttp2_incoming_byte_stream*)byte_stream; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT( - &bs->destroy_action, incoming_byte_stream_destroy_locked, - bs, grpc_combiner_scheduler(bs->transport->combiner)), + exec_ctx, + GRPC_CLOSURE_INIT(&bs->destroy_action, + incoming_byte_stream_destroy_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_destroy", 0); } static void incoming_byte_stream_publish_error( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error) { - grpc_chttp2_stream *s = bs->stream; + grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs, + grpc_error* error) { + grpc_chttp2_stream* s = bs->stream; GPR_ASSERT(error != GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); @@ -2976,13 +2981,13 @@ static void incoming_byte_stream_publish_error( GRPC_ERROR_REF(error)); } -grpc_error *grpc_chttp2_incoming_byte_stream_push( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice, grpc_slice *slice_out) { - grpc_chttp2_stream *s = bs->stream; +grpc_error* grpc_chttp2_incoming_byte_stream_push( + grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs, + grpc_slice slice, grpc_slice* slice_out) { + grpc_chttp2_stream* s = bs->stream; if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { - grpc_error *error = + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); @@ -2997,10 +3002,10 @@ grpc_error *grpc_chttp2_incoming_byte_stream_push( } } -grpc_error *grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error, bool reset_on_error) { - grpc_chttp2_stream *s = bs->stream; +grpc_error* grpc_chttp2_incoming_byte_stream_finished( + grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs, + grpc_error* error, bool reset_on_error) { + grpc_chttp2_stream* s = bs->stream; if (error == GRPC_ERROR_NONE) { if (bs->remaining_bytes != 0) { @@ -3014,11 +3019,11 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished( return error; } -static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_error *error) { - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; +static void incoming_byte_stream_shutdown(grpc_exec_ctx* exec_ctx, + grpc_byte_stream* byte_stream, + grpc_error* error) { + grpc_chttp2_incoming_byte_stream* bs = + (grpc_chttp2_incoming_byte_stream*)byte_stream; GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( exec_ctx, bs, error, true /* reset_on_error */)); } @@ -3027,13 +3032,13 @@ static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { incoming_byte_stream_next, incoming_byte_stream_pull, incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; -static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - void *byte_stream, - grpc_error *error_ignored) { - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_chttp2_stream *s = bs->stream; - grpc_chttp2_transport *t = s->t; +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx, + void* byte_stream, + grpc_error* error_ignored) { + grpc_chttp2_incoming_byte_stream* bs = + (grpc_chttp2_incoming_byte_stream*)byte_stream; + grpc_chttp2_stream* s = bs->stream; + grpc_chttp2_transport* t = s->t; GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); incoming_byte_stream_unref(exec_ctx, bs); @@ -3042,11 +3047,11 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } -grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, +grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create( + grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size, uint32_t flags) { - grpc_chttp2_incoming_byte_stream *incoming_byte_stream = - (grpc_chttp2_incoming_byte_stream *)gpr_malloc( + grpc_chttp2_incoming_byte_stream* incoming_byte_stream = + (grpc_chttp2_incoming_byte_stream*)gpr_malloc( sizeof(*incoming_byte_stream)); incoming_byte_stream->base.length = frame_size; incoming_byte_stream->remaining_bytes = frame_size; @@ -3064,8 +3069,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( * RESOURCE QUOTAS */ -static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void post_benign_reclaimer(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { if (!t->benign_reclaimer_registered) { t->benign_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); @@ -3075,8 +3080,8 @@ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, } } -static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void post_destructive_reclaimer(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { if (!t->destructive_reclaimer_registered) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); @@ -3086,9 +3091,9 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, } } -static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; +static void benign_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { /* Channel with no active streams: send a goaway to try and make it @@ -3116,14 +3121,14 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); } -static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; +static void destructive_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; size_t n = grpc_chttp2_stream_map_size(&t->stream_map); t->destructive_reclaimer_registered = false; if (error == GRPC_ERROR_NONE && n > 0) { - grpc_chttp2_stream *s = - (grpc_chttp2_stream *)grpc_chttp2_stream_map_rand(&t->stream_map); + grpc_chttp2_stream* s = + (grpc_chttp2_stream*)grpc_chttp2_stream_map_rand(&t->stream_map); if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string, s->id); @@ -3152,7 +3157,7 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, * MONITORING */ -const char *grpc_chttp2_initiate_write_reason_string( +const char* grpc_chttp2_initiate_write_reason_string( grpc_chttp2_initiate_write_reason reason) { switch (reason) { case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: @@ -3199,9 +3204,9 @@ const char *grpc_chttp2_initiate_write_reason_string( GPR_UNREACHABLE_CODE(return "unknown"); } -static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, - grpc_transport *t) { - return ((grpc_chttp2_transport *)t)->ep; +static grpc_endpoint* chttp2_get_endpoint(grpc_exec_ctx* exec_ctx, + grpc_transport* t) { + return ((grpc_chttp2_transport*)t)->ep; } static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), @@ -3215,21 +3220,21 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), destroy_transport, chttp2_get_endpoint}; -static const grpc_transport_vtable *get_vtable(void) { return &vtable; } +static const grpc_transport_vtable* get_vtable(void) { return &vtable; } -grpc_transport *grpc_create_chttp2_transport( - grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, - grpc_endpoint *ep, int is_client) { - grpc_chttp2_transport *t = - (grpc_chttp2_transport *)gpr_zalloc(sizeof(grpc_chttp2_transport)); +grpc_transport* grpc_create_chttp2_transport( + grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args, + grpc_endpoint* ep, int is_client) { + grpc_chttp2_transport* t = + (grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport)); init_transport(exec_ctx, t, channel_args, ep, is_client != 0); return &t->base; } -void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, - grpc_transport *transport, - grpc_slice_buffer *read_buffer) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; +void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx, + grpc_transport* transport, + grpc_slice_buffer* read_buffer) { + grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport; GRPC_CHTTP2_REF_TRANSPORT( t, "reading_action"); /* matches unref inside reading_action */ if (read_buffer != NULL) { |