diff options
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 170 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 2 | ||||
-rw-r--r-- | test/core/end2end/tests/keepalive_timeout.c | 8 |
3 files changed, 101 insertions, 79 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 10dc6e6ff6..f9ee71fb11 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -147,10 +147,12 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, #define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 /** keepalive-relevant functions */ -static void keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void keepalive_ping_ack_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); @@ -224,73 +226,6 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } #endif -static void keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = arg; - GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - t->keepalive_ping_id = send_ping_locked( - exec_ctx, t, - grpc_closure_create(keepalive_ping_ack_locked, t, - grpc_combiner_scheduler(t->combiner, false))); - if (t->keepalive_permit_without_calls || t->stream_map.count > 0) - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); - grpc_timer_init( - exec_ctx, &t->keepalive_watchdog_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout), - grpc_closure_create(keepalive_watchdog_fired_locked, t, - grpc_combiner_scheduler(t->combiner, false)), - gpr_now(GPR_CLOCK_MONOTONIC)); - } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping"); -} - -static void keepalive_ping_ack_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = arg; - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - t->keepalive_ping_id = NULL; - grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - grpc_closure_create(keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)), - gpr_now(GPR_CLOCK_MONOTONIC)); - } - } else { - GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_DYING); - } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); -} - -static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = arg; - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - if (t->keepalive_ping_id != NULL) { - grpc_chttp2_ack_ping(exec_ctx, t, t->keepalive_ping_id); - } - close_transport_locked(exec_ctx, t, - GRPC_ERROR_CREATE("keepalive watchdog timeout")); - } - } else { - // GRPC_LOG_IF_ERROR("keepalive_watchdog", error); - if (error != GRPC_ERROR_CANCELLED) { - gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", - t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); - } - } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog"); -} - static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, const grpc_channel_args *channel_args, grpc_endpoint *ep, bool is_client) { @@ -337,6 +272,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner, false)); grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->start_keepalive_ping_locked, + start_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->finish_keepalive_ping_locked, + finish_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); @@ -400,9 +341,14 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* client-side keepalive setting */ t->keepalive_time = - gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN); + DEFAULT_KEEPALIVE_TIME_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN); t->keepalive_timeout = - gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND, GPR_TIMESPAN); + DEFAULT_KEEPALIVE_TIMEOUT_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND, + GPR_TIMESPAN); t->keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; if (channel_args) { @@ -529,11 +475,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, // Start client-side keepalive pings if (t->is_client) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping"); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init( exec_ctx, &t->keepalive_ping_timer, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - grpc_closure_create(keepalive_ping_locked, t, + grpc_closure_create(init_keepalive_ping_locked, t, grpc_combiner_scheduler(t->combiner, false)), gpr_now(GPR_CLOCK_MONOTONIC)); } @@ -2127,6 +2073,80 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) { + if (t->keepalive_permit_without_calls || t->stream_map.count > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, + &t->start_keepalive_ping_locked, + &t->finish_keepalive_ping_locked); + } else { + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + grpc_closure_create(init_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); +} + +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); + grpc_timer_init( + exec_ctx, &t->keepalive_watchdog_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout), + grpc_closure_create(keepalive_watchdog_fired_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + gpr_now(GPR_CLOCK_MONOTONIC)); +} + +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + grpc_closure_create(init_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); +} + +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked(exec_ctx, t, + GRPC_ERROR_CREATE("keepalive watchdog timeout")); + } + } else { + // The watchdog timer should have been cancelled. + if (error != GRPC_ERROR_CANCELLED) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog"); +} + /******************************************************************************* * CALLBACK LOOP */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 7fcab0a722..0e3e900da0 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -391,6 +391,8 @@ struct grpc_chttp2_transport { grpc_closure destructive_reclaimer_locked; /* keep-alive ping support */ + grpc_closure start_keepalive_ping_locked; + grpc_closure finish_keepalive_ping_locked; /** timer to initiate ping events */ grpc_timer keepalive_ping_timer; /** watchdog to kill the transport when waiting for the keepalive ping */ diff --git a/test/core/end2end/tests/keepalive_timeout.c b/test/core/end2end/tests/keepalive_timeout.c index 40a6bb3db5..b1f53b17c3 100644 --- a/test/core/end2end/tests/keepalive_timeout.c +++ b/test/core/end2end/tests/keepalive_timeout.c @@ -61,7 +61,7 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, } static gpr_timespec n_seconds_time(int n) { - return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); + return grpc_timeout_seconds_to_deadline(n); } static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); } @@ -76,9 +76,9 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) - .type == GRPC_OP_COMPLETE); + GPR_ASSERT( + grpc_completion_queue_pluck(f->cq, tag(1000), five_seconds_time(), NULL) + .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; } |