aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-02-26 16:54:18 -0800
committerGravatar Yuchen Zeng <zyc@google.com>2017-02-26 17:05:47 -0800
commit990d9fe1460d0f274857a5aaa8fe28f4b3852d9b (patch)
treeb2dec32850cee0aa4288e9cd4ccad56200990b9f /src/core/ext
parent345abfd6eff9db3a0e6fd9f37d1e8871e9478b84 (diff)
Client-side keepalive ping
Based on soltanmm-google's #9114
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c140
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h23
2 files changed, 163 insertions, 0 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 28a3166832..10dc6e6ff6 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -48,16 +48,19 @@
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/timeout_encoding.h"
+#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"
#define DEFAULT_WINDOW 65535
@@ -66,6 +69,10 @@
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
+#define DEFAULT_KEEPALIVE_TIME_SECOND INT_MAX
+#define DEFAULT_KEEPALIVE_TIMEOUT_SECOND 20
+#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
+
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;
@@ -139,6 +146,14 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
+/** keepalive-relevant functions */
+static void keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void keepalive_ping_ack_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -209,6 +224,73 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
#endif
+static void keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
+ if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
+ t->keepalive_ping_id = send_ping_locked(
+ exec_ctx, t,
+ grpc_closure_create(keepalive_ping_ack_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)));
+ if (t->keepalive_permit_without_calls || t->stream_map.count > 0)
+ GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
+ grpc_timer_init(
+ exec_ctx, &t->keepalive_watchdog_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout),
+ grpc_closure_create(keepalive_watchdog_fired_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)),
+ gpr_now(GPR_CLOCK_MONOTONIC));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping");
+}
+
+static void keepalive_ping_ack_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
+ if (error == GRPC_ERROR_NONE) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
+ t->keepalive_ping_id = NULL;
+ grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
+ GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping");
+ grpc_timer_init(
+ exec_ctx, &t->keepalive_ping_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
+ grpc_closure_create(keepalive_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)),
+ gpr_now(GPR_CLOCK_MONOTONIC));
+ }
+ } else {
+ GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_DYING);
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end");
+}
+
+static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
+ if (error == GRPC_ERROR_NONE) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
+ if (t->keepalive_ping_id != NULL) {
+ grpc_chttp2_ack_ping(exec_ctx, t, t->keepalive_ping_id);
+ }
+ close_transport_locked(exec_ctx, t,
+ GRPC_ERROR_CREATE("keepalive watchdog timeout"));
+ }
+ } else {
+ // GRPC_LOG_IF_ERROR("keepalive_watchdog", error);
+ if (error != GRPC_ERROR_CANCELLED) {
+ gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
+ t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
+ }
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog");
+}
+
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
grpc_endpoint *ep, bool is_client) {
@@ -316,6 +398,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN),
};
+ /* client-side keepalive setting */
+ t->keepalive_time =
+ gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN);
+ t->keepalive_timeout =
+ gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND, GPR_TIMESPAN);
+ t->keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -363,6 +452,29 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
t->enable_bdp_probe = grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){1, 0, 1});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_KEEPALIVE_TIME)) {
+ const int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_KEEPALIVE_TIME_SECOND, 1, INT_MAX});
+ t->keepalive_time = value == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_seconds(value, GPR_TIMESPAN);
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT)) {
+ const int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_KEEPALIVE_TIMEOUT_SECOND, 0,
+ INT_MAX});
+ t->keepalive_timeout = value == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_seconds(value, GPR_TIMESPAN);
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
+ t->keepalive_permit_without_calls =
+ (uint32_t)grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
} else {
static const struct {
const char *channel_arg_name;
@@ -414,6 +526,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ // Start client-side keepalive pings
+ if (t->is_client) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping");
+ grpc_timer_init(
+ exec_ctx, &t->keepalive_ping_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
+ grpc_closure_create(keepalive_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)),
+ gpr_now(GPR_CLOCK_MONOTONIC));
+ }
+
grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
post_benign_reclaimer(exec_ctx, t);
}
@@ -458,6 +582,22 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
+ if (t->is_client) {
+ switch (t->keepalive_state) {
+ case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: {
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
+ break;
+ }
+ case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: {
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
+ grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
+ break;
+ }
+ case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: {
+ break;
+ }
+ }
+ }
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s;
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 5d41f4bfda..7fcab0a722 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -50,6 +50,7 @@
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/pid_controller.h"
@@ -208,6 +209,12 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_closure finished_action;
};
+typedef enum {
+ GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
+ GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
+ GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
+} grpc_chttp2_keepalive_state;
+
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
gpr_refcount refs;
@@ -382,6 +389,22 @@ struct grpc_chttp2_transport {
grpc_closure benign_reclaimer_locked;
/** destructive cleanup closure */
grpc_closure destructive_reclaimer_locked;
+
+ /* keep-alive ping support */
+ /** timer to initiate ping events */
+ grpc_timer keepalive_ping_timer;
+ /** watchdog to kill the transport when waiting for the keepalive ping */
+ grpc_timer keepalive_watchdog_timer;
+ /** time duration in between pings */
+ gpr_timespec keepalive_time;
+ /** grace period for a ping to complete before watchdog kicks in */
+ gpr_timespec keepalive_timeout;
+ /** if keepalive pings are allowed when there's no outstanding streams */
+ bool keepalive_permit_without_calls;
+ /** keep-alive state machine state */
+ grpc_chttp2_keepalive_state keepalive_state;
+
+ uint8_t *keepalive_ping_id;
};
typedef enum {