aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.c')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c321
1 files changed, 146 insertions, 175 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index f3268bcfca..cd788e9ce3 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -50,7 +35,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -92,6 +76,10 @@ static bool g_default_keepalive_permit_without_calls =
grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER(false);
grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER(false);
+#ifndef NDEBUG
+grpc_tracer_flag grpc_trace_chttp2_refcount = GRPC_TRACER_INITIALIZER(false);
+#endif
+
static const grpc_transport_vtable vtable;
/* forward declarations of various callbacks that we'll build closures around */
@@ -107,8 +95,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t,
static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_error *error);
/** Set a transport level setting, and push it to our peer */
-static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_setting_id id, uint32_t value);
+static void queue_setting_update(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value);
static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
@@ -228,20 +217,26 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_free(t);
}
-#ifdef GRPC_CHTTP2_REFCOUNTING_DEBUG
+#ifndef NDEBUG
void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, const char *reason,
const char *file, int line) {
- gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", t,
- t->refs.count, t->refs.count - 1, reason, file, line);
+ if (GRPC_TRACER_ON(grpc_trace_chttp2_refcount)) {
+ gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
+ gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]",
+ t, val, val - 1, reason, file, line);
+ }
if (!gpr_unref(&t->refs)) return;
destruct_transport(exec_ctx, t);
}
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason,
const char *file, int line) {
- gpr_log(GPR_DEBUG, "chttp2: ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", t,
- t->refs.count, t->refs.count + 1, reason, file, line);
+ if (GRPC_TRACER_ON(grpc_trace_chttp2_refcount)) {
+ gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
+ gpr_log(GPR_DEBUG, "chttp2: ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]",
+ t, val, val + 1, reason, file, line);
+ }
gpr_ref(&t->refs);
}
#else
@@ -267,7 +262,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ep = ep;
/* one ref is for destroy */
gpr_ref_init(&t->refs, 1);
- t->combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep));
+ t->combiner = grpc_combiner_create();
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
@@ -285,32 +280,32 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_slice_buffer_init(&t->outbuf);
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
- grpc_closure_init(&t->write_action, write_action, t,
+ GRPC_CLOSURE_INIT(&t->write_action, write_action, t,
grpc_schedule_on_exec_ctx);
- grpc_closure_init(&t->read_action_locked, read_action_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->destructive_reclaimer_locked,
+ GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
- t, grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
- t, grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->start_keepalive_ping_locked,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
+ t, grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
+ t, grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
start_keepalive_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->finish_keepalive_ping_locked,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
finish_keepalive_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
- grpc_closure_init(&t->keepalive_watchdog_fired_locked,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
@@ -353,20 +348,21 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
GRPC_CHTTP2_CLIENT_CONNECT_STRING));
- grpc_chttp2_initiate_write(exec_ctx, t, false, "initial_write");
+ grpc_chttp2_initiate_write(exec_ctx, t, "initial_write");
}
/* configure http2 the way we like it */
if (is_client) {
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- DEFAULT_WINDOW);
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
- DEFAULT_MAX_HEADER_LIST_SIZE);
- push_setting(exec_ctx, t,
- GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ DEFAULT_WINDOW);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ DEFAULT_MAX_HEADER_LIST_SIZE);
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
t->ping_policy = (grpc_chttp2_repeated_ping_policy){
.max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
@@ -533,8 +529,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
int value = grpc_channel_arg_get_integer(
&channel_args->args[i], settings_map[j].integer_options);
if (value >= 0) {
- push_setting(exec_ctx, t, settings_map[j].setting_id,
- (uint32_t)value);
+ queue_setting_update(exec_ctx, t, settings_map[j].setting_id,
+ (uint32_t)value);
}
}
break;
@@ -565,7 +561,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
- grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
+ grpc_chttp2_initiate_write(exec_ctx, t, "init");
post_benign_reclaimer(exec_ctx, t);
}
@@ -583,9 +579,9 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_closure_sched(exec_ctx, grpc_closure_create(
- destroy_transport_locked, t,
- grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
+ grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
}
@@ -636,7 +632,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#ifndef NDEBUG
void grpc_chttp2_stream_ref(grpc_chttp2_stream *s, const char *reason) {
grpc_stream_ref(s->refcount, reason);
}
@@ -672,13 +668,13 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
+ GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_init(&s->frame_storage);
s->pending_byte_stream = false;
- grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
- grpc_combiner_scheduler(t->combiner, false));
+ GRPC_CLOSURE_INIT(&s->reset_byte_stream, reset_byte_stream, s,
+ grpc_combiner_scheduler(t->combiner));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -749,7 +745,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_END("destroy_stream", 0);
- grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
@@ -760,9 +756,9 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
s->destroy_stream_arg = then_schedule_closure;
- grpc_closure_sched(
- exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
- grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
+ grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("destroy_stream", 0);
}
@@ -800,8 +796,6 @@ static const char *write_state_name(grpc_chttp2_write_state st) {
return "WRITING";
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
return "WRITING+MORE";
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
- return "WRITING+MORE+COVERED";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
@@ -814,7 +808,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name(st), reason));
t->write_state = st;
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
- grpc_closure_list_sched(exec_ctx, &t->run_after_write);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &t->run_after_write);
if (t->close_transport_on_writes_finished != NULL) {
grpc_error *err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = NULL;
@@ -824,38 +818,25 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- bool covered_by_poller, const char *reason) {
+ grpc_chttp2_transport *t, const char *reason) {
GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
- grpc_closure_init(
- &t->write_action_begin_locked, write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)),
+ GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
+ write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
- set_write_state(
- exec_ctx, t,
- covered_by_poller
- ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER
- : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
- reason);
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
+ reason);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
- if (covered_by_poller) {
- set_write_state(
- exec_ctx, t,
- GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
- reason);
- }
- break;
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
break;
}
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
@@ -871,10 +852,10 @@ void grpc_chttp2_become_writable(
case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK:
break;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, true, reason);
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
break;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, false, reason);
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
break;
}
}
@@ -894,12 +875,12 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
case GRPC_CHTTP2_PARTIAL_WRITE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
"begin writing partial");
- grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_FULL_WRITE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"begin writing");
- grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
break;
}
GPR_TIMER_END("write_action_begin_locked", 0);
@@ -910,8 +891,8 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
GPR_TIMER_BEGIN("write_action", 0);
grpc_endpoint_write(
exec_ctx, t->ep, &t->outbuf,
- grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t,
- grpc_combiner_scheduler(t->combiner, false)));
+ GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
+ grpc_combiner_scheduler(t->combiner)));
GPR_TIMER_END("write_action", 0);
}
@@ -945,23 +926,11 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [!covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_closure_run(
+ GRPC_CLOSURE_RUN(
exec_ctx,
- grpc_closure_init(
- &t->write_action_begin_locked, write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, false)),
- GRPC_ERROR_NONE);
- break;
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
- GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
- "continue writing [covered]");
- GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_closure_run(
- exec_ctx,
- grpc_closure_init(&t->write_action_begin_locked,
+ GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, true)),
+ grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
break;
}
@@ -972,8 +941,11 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("terminate_writing_with_lock", 0);
}
-static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_setting_id id, uint32_t value) {
+// Dirties an HTTP2 setting to be sent out next time a writing path occurs.
+// If the change needs to occur immediately, manually initiate a write.
+static void queue_setting_update(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
&grpc_chttp2_settings_parameters[id];
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
@@ -984,7 +956,6 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->dirtied_local_settings = 1;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "push_setting");
}
}
@@ -1089,7 +1060,7 @@ static void null_then_run_closure(grpc_exec_ctx *exec_ctx,
grpc_closure **closure, grpc_error *error) {
grpc_closure *c = *closure;
*closure = NULL;
- grpc_closure_run(exec_ctx, c, error);
+ GRPC_CLOSURE_RUN(exec_ctx, c, error);
}
void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
@@ -1131,7 +1102,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
}
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
- grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ GRPC_CLOSURE_RUN(exec_ctx, closure, closure->error_data.error);
} else {
grpc_closure_list_append(&t->run_after_write, closure,
closure->error_data.error);
@@ -1267,7 +1238,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
on_complete =
- grpc_closure_create(do_nothing, NULL, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_CREATE(do_nothing, NULL, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@@ -1380,7 +1351,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->next_message_end_offset = s->flow_controlled_bytes_written +
(int64_t)s->flow_controlled_buffer.length +
(int64_t)len;
- s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
s->next_message_end_offset -= t->write_buffer_size;
s->write_buffering = true;
@@ -1447,6 +1417,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
op_payload->recv_initial_metadata.recv_initial_metadata_ready;
s->recv_initial_metadata =
op_payload->recv_initial_metadata.recv_initial_metadata;
+ s->trailing_metadata_available =
+ op_payload->recv_initial_metadata.trailing_metadata_available;
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
}
@@ -1500,11 +1472,10 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
op->handler_private.extra_arg = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
- grpc_closure_init(
- &op->handler_private.closure, perform_stream_op_locked, op,
- grpc_combiner_scheduler(t->combiner, op->covered_by_poller)),
+ GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
+ op, grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op", 0);
}
@@ -1517,7 +1488,7 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_ping_queue *pq = &t->ping_queues[i];
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
- grpc_closure_list_sched(exec_ctx, &pq->lists[j]);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]);
}
}
GRPC_ERROR_UNREF(error);
@@ -1531,7 +1502,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_NONE);
if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
GRPC_ERROR_NONE)) {
- grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping");
+ grpc_chttp2_initiate_write(exec_ctx, t, "send_ping");
}
}
@@ -1539,7 +1510,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
t->ping_state.is_delayed_ping_timer_set = false;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "retry_send_ping");
+ grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping");
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1552,9 +1523,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_free(from);
return;
}
- grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
- grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings");
+ grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings");
}
}
@@ -1567,7 +1538,7 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&slice, &http_error);
grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error,
grpc_slice_ref_internal(slice), &t->qbuf);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+ grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent");
GRPC_ERROR_UNREF(error);
}
@@ -1593,12 +1564,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = op->handler_private.extra_arg;
grpc_error *close_transport = op->disconnect_with_error;
- if (op->on_connectivity_state_change != NULL) {
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
- op->on_connectivity_state_change);
- }
-
if (op->goaway_error) {
send_goaway(exec_ctx, t, op->goaway_error);
}
@@ -1622,11 +1587,17 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
op->send_ping);
}
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ }
+
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
}
- grpc_closure_run(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "transport_op");
}
@@ -1638,11 +1609,11 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free(msg);
op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
- grpc_closure_sched(
- exec_ctx, grpc_closure_init(&op->handler_private.closure,
- perform_transport_op_locked, op,
- grpc_combiner_scheduler(t->combiner, false)),
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_INIT(&op->handler_private.closure,
+ perform_transport_op_locked, op,
+ grpc_combiner_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1797,7 +1768,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error,
&s->stats.outgoing));
- grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream");
+ grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream");
}
}
if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
@@ -2110,7 +2081,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&s->stats.outgoing));
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "close_from_api");
+ grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api");
}
typedef struct {
@@ -2152,8 +2123,8 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string,
(int)bdp);
}
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- (uint32_t)bdp);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ (uint32_t)bdp);
}
static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -2172,8 +2143,8 @@ static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string,
(int)frame_size);
}
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- (uint32_t)frame_size);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ (uint32_t)frame_size);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2517,7 +2488,7 @@ static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
} else {
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
s->on_next = NULL;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
@@ -2596,9 +2567,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
- grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != NULL) {
incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
@@ -2608,7 +2579,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
if (bs->remaining_bytes != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != NULL) {
incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
@@ -2639,11 +2610,11 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
gpr_ref(&bs->refs);
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
- grpc_closure_init(
- &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_CLOSURE_INIT(&bs->next_action.closure,
+ incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return false;
@@ -2668,7 +2639,7 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
} else {
grpc_error *error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
GPR_TIMER_END("incoming_byte_stream_pull", 0);
@@ -2697,11 +2668,10 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(
- &bs->destroy_action, incoming_byte_stream_destroy_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, GRPC_CLOSURE_INIT(
+ &bs->destroy_action, incoming_byte_stream_destroy_locked,
+ bs, grpc_combiner_scheduler(bs->transport->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
@@ -2712,7 +2682,7 @@ static void incoming_byte_stream_publish_error(
grpc_chttp2_stream *s = bs->stream;
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
s->on_next = NULL;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_REF(error);
@@ -2729,7 +2699,7 @@ grpc_error *grpc_chttp2_incoming_byte_stream_push(
grpc_error *error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
- grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
grpc_slice_unref_internal(exec_ctx, slice);
return error;
} else {
@@ -2752,7 +2722,7 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished(
}
}
if (error != GRPC_ERROR_NONE && reset_on_error) {
- grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
}
incoming_byte_stream_unref(exec_ctx, bs);
return error;
@@ -2772,6 +2742,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
return incoming_byte_stream;
}
@@ -2986,5 +2957,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
- grpc_closure_sched(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
}