aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c184
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c22
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h18
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c20
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c2
-rw-r--r--src/node/ext/byte_buffer.cc39
-rw-r--r--src/node/ext/byte_buffer.h4
-rw-r--r--src/node/ext/slice.cc5
-rw-r--r--src/node/performance/benchmark_server.js8
-rw-r--r--src/node/performance/generic_service.js11
-rw-r--r--src/node/performance/worker_service_impl.js23
-rw-r--r--src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py5
12 files changed, 264 insertions, 77 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 0f423c1bcf..63effe4011 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -69,13 +69,21 @@
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
-#define DEFAULT_CLIENT_KEEPALIVE_TIME_S INT_MAX
-#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_S 20
+#define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
+#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
+#define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */
+#define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
-
-static int g_default_client_keepalive_time_s = DEFAULT_CLIENT_KEEPALIVE_TIME_S;
-static int g_default_client_keepalive_timeout_s =
- DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_S;
+#define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
+
+static int g_default_client_keepalive_time_ms =
+ DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
+static int g_default_client_keepalive_timeout_ms =
+ DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
+static int g_default_server_keepalive_time_ms =
+ DEFAULT_SERVER_KEEPALIVE_TIME_MS;
+static int g_default_server_keepalive_timeout_ms =
+ DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
static bool g_default_keepalive_permit_without_calls =
DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
@@ -153,6 +161,8 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
+#define DEFAULT_MAX_PING_STRIKES 2
+#define DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
/** keepalive-relevant functions */
static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -353,19 +363,35 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
.max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
.min_time_between_pings =
gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN),
+ .max_ping_strikes = DEFAULT_MAX_PING_STRIKES,
+ .min_ping_interval_without_data = gpr_time_from_millis(
+ DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, GPR_TIMESPAN),
};
- /* client-side keepalive setting */
- t->keepalive_time =
- g_default_client_keepalive_time_s == INT_MAX
- ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(g_default_client_keepalive_time_s,
- GPR_TIMESPAN);
- t->keepalive_timeout =
- g_default_client_keepalive_timeout_s == INT_MAX
- ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(g_default_client_keepalive_timeout_s,
- GPR_TIMESPAN);
+ /* Keepalive setting */
+ if (t->is_client) {
+ t->keepalive_time =
+ g_default_client_keepalive_time_ms == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(g_default_client_keepalive_time_ms,
+ GPR_TIMESPAN);
+ t->keepalive_timeout =
+ g_default_client_keepalive_timeout_ms == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(g_default_client_keepalive_timeout_ms,
+ GPR_TIMESPAN);
+ } else {
+ t->keepalive_time =
+ g_default_server_keepalive_time_ms == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(g_default_server_keepalive_time_ms,
+ GPR_TIMESPAN);
+ t->keepalive_timeout =
+ g_default_server_keepalive_timeout_ms == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(g_default_server_keepalive_timeout_ms,
+ GPR_TIMESPAN);
+ }
t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls;
if (channel_args) {
@@ -399,6 +425,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&channel_args->args[i],
(grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX});
} else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
+ t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_PING_STRIKES, 0, INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) {
t->ping_policy.min_time_between_pings = gpr_time_from_millis(
grpc_channel_arg_get_integer(
@@ -406,6 +437,15 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0,
INT_MAX}),
GPR_TIMESPAN);
+ } else if (0 ==
+ strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_PING_INTERVAL_WITHOUT_DATA_MS)) {
+ t->ping_policy.min_ping_interval_without_data = gpr_time_from_millis(
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){
+ DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, 0, INT_MAX}),
+ GPR_TIMESPAN);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
@@ -416,23 +456,27 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->enable_bdp_probe = grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){1, 0, 1});
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_CLIENT_KEEPALIVE_TIME_S)) {
+ GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i],
- (grpc_integer_options){g_default_client_keepalive_time_s, 1,
- INT_MAX});
+ (grpc_integer_options){t->is_client
+ ? g_default_client_keepalive_time_ms
+ : g_default_server_keepalive_time_ms,
+ 1, INT_MAX});
t->keepalive_time = value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(value, GPR_TIMESPAN);
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S)) {
+ GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i],
- (grpc_integer_options){g_default_client_keepalive_timeout_s, 0,
- INT_MAX});
+ (grpc_integer_options){t->is_client
+ ? g_default_client_keepalive_timeout_ms
+ : g_default_server_keepalive_timeout_ms,
+ 0, INT_MAX});
t->keepalive_timeout = value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(value, GPR_TIMESPAN);
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
t->keepalive_permit_without_calls =
@@ -495,8 +539,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ping_policy.max_pings_without_data;
t->ping_state.is_delayed_ping_timer_set = false;
- /** Start client-side keepalive pings */
- if (t->is_client) {
+ t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+
+ /* Start keepalive pings */
+ if (gpr_time_cmp(t->keepalive_time, gpr_inf_future(GPR_TIMESPAN)) != 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
@@ -925,6 +972,26 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
// GRPC_CHTTP2_IF_TRACING(
// gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
t->seen_goaway = 1;
+
+ /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
+ * data equal to “too_many_pings”, it should log the occurrence at a log level
+ * that is enabled by default and double the configured KEEPALIVE_TIME used
+ * for new connections on that channel. */
+ if (t->is_client && goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
+ grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0) {
+ gpr_log(GPR_ERROR,
+ "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
+ "data equal to \"too_many_pings\"");
+ double current_keepalive_time_ms =
+ gpr_timespec_to_micros(t->keepalive_time) / 1000;
+ t->keepalive_time =
+ current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis((int64_t)(current_keepalive_time_ms *
+ KEEPALIVE_TIME_BACKOFF_MULTIPLIER),
+ GPR_TIMESPAN);
+ }
+
/* lie: use transient failure from the transport to indicate goaway has been
* received */
connectivity_state_set(
@@ -1466,6 +1533,21 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
+void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ gpr_log(GPR_DEBUG, "PING strike");
+ if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
+ t->ping_policy.max_ping_strikes != 0) {
+ send_goaway(exec_ctx, t,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
+ GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
+ /*The transport will be closed after the write is done */
+ close_transport_locked(
+ exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"));
+ }
+}
+
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
@@ -2132,6 +2214,10 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
}
+ /* Reset the keepalive ping timer */
+ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
+ }
grpc_bdp_estimator_start_ping(&t->bdp_estimator);
}
@@ -2146,20 +2232,32 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}
-void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args) {
+void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
+ bool is_client) {
size_t i;
if (args) {
for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_CLIENT_KEEPALIVE_TIME_S)) {
- g_default_client_keepalive_time_s = grpc_channel_arg_get_integer(
- &args->args[i], (grpc_integer_options){
- g_default_client_keepalive_time_s, 1, INT_MAX});
- } else if (0 == strcmp(args->args[i].key,
- GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S)) {
- g_default_client_keepalive_timeout_s = grpc_channel_arg_get_integer(
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){g_default_client_keepalive_time_ms, 1,
+ INT_MAX});
+ if (is_client) {
+ g_default_client_keepalive_time_ms = value;
+ } else {
+ g_default_server_keepalive_time_ms = value;
+ }
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
+ const int value = grpc_channel_arg_get_integer(
&args->args[i],
- (grpc_integer_options){g_default_client_keepalive_timeout_s, 0,
+ (grpc_integer_options){g_default_client_keepalive_timeout_ms, 0,
INT_MAX});
+ if (is_client) {
+ g_default_client_keepalive_timeout_ms = value;
+ } else {
+ g_default_server_keepalive_timeout_ms = value;
+ }
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
g_default_keepalive_permit_without_calls =
@@ -2177,7 +2275,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport *t = arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) {
- if (t->keepalive_permit_without_calls || t->stream_map.count > 0) {
+ if (t->keepalive_permit_without_calls ||
+ grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE,
@@ -2190,6 +2289,13 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
&t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
}
+ } else if (error == GRPC_ERROR_CANCELLED && !(t->destroying || t->closed)) {
+ /* The keepalive ping timer may be cancelled by bdp */
+ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
+ grpc_timer_init(
+ exec_ctx, &t->keepalive_ping_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
+ &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping");
}
@@ -2231,8 +2337,8 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
"keepalive watchdog timeout"));
}
} else {
- /** The watchdog timer should have been cancelled by
- finish_keepalive_ping_locked. */
+ /* The watchdog timer should have been cancelled by
+ * finish_keepalive_ping_locked. */
if (error != GRPC_ERROR_CANCELLED) {
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index 46dafdb62f..6016e43127 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -103,6 +103,28 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
+ if (!t->is_client) {
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec next_allowed_ping =
+ gpr_time_add(t->ping_recv_state.last_ping_recv_time,
+ t->ping_policy.min_ping_interval_without_data);
+
+ if (t->keepalive_permit_without_calls == 0 &&
+ grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ /* According to RFC1122, the interval of TCP Keep-Alive is default to
+ no less than two hours. When there is no outstanding streams, we
+ restrict the number of PINGS equivalent to TCP Keep-Alive. */
+ next_allowed_ping =
+ gpr_time_add(t->ping_recv_state.last_ping_recv_time,
+ gpr_time_from_seconds(7200, GPR_TIMESPAN));
+ }
+
+ if (gpr_time_cmp(next_allowed_ping, now) > 0) {
+ grpc_chttp2_add_ping_strike(exec_ctx, t);
+ }
+
+ t->ping_recv_state.last_ping_recv_time = now;
+ }
if (!g_disable_ping_ack) {
if (t->ping_ack_count == t->ping_ack_capacity) {
t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 50993e4ecd..6eb848b8d7 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -97,6 +97,8 @@ typedef struct {
typedef struct {
gpr_timespec min_time_between_pings;
int max_pings_without_data;
+ int max_ping_strikes;
+ gpr_timespec min_ping_interval_without_data;
} grpc_chttp2_repeated_ping_policy;
typedef struct {
@@ -106,6 +108,11 @@ typedef struct {
bool is_delayed_ping_timer_set;
} grpc_chttp2_repeated_ping_state;
+typedef struct {
+ gpr_timespec last_ping_recv_time;
+ int ping_strikes;
+} grpc_chttp2_server_ping_recv_state;
+
/* deframer state for the overall http2 stream of bytes */
typedef enum {
/* prefix: one entry per http2 connection prefix byte */
@@ -316,6 +323,7 @@ struct grpc_chttp2_transport {
size_t ping_ack_count;
size_t ping_ack_capacity;
uint64_t *ping_acks;
+ grpc_chttp2_server_ping_recv_state ping_recv_state;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
@@ -792,6 +800,13 @@ void grpc_chttp2_incoming_byte_stream_finished(
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint64_t id);
+/** Add a new ping strike to ping_recv_state.ping_strikes. If
+ ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
+ with error code ENHANCE_YOUR_CALM and additional debug data resembling
+ “too_many_pings” followed by immediately closing the connection. */
+void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+
typedef enum {
/* don't initiate a transport write, but piggyback on the next one */
GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
@@ -831,6 +846,7 @@ uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t);
/** Set the default keepalive configurations, must only be called at
initialization */
-void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args);
+void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
+ bool is_client);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index b3918df7dc..ae9df175ff 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -237,6 +237,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
now_writing = true;
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
}
/* send any window updates */
if (s->announce_window > 0) {
@@ -246,6 +251,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
s->id, s->announce_window, &s->stats.outgoing));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
}
if (sent_initial_metadata) {
@@ -278,6 +288,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
send_bytes);
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = true;
@@ -363,6 +378,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
0, announced, &throwaway_stats));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
}
for (size_t i = 0; i < t->ping_ack_count; i++) {
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 9bd8914b98..0b9189558f 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -1178,7 +1178,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (stream_state->rs.compressed) {
stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS;
}
- *((grpc_byte_buffer **)stream_op->recv_message) =
+ *((grpc_byte_buffer **)stream_op->payload->recv_message.recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
grpc_closure_sched(exec_ctx,
stream_op->payload->recv_message.recv_message_ready,
diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc
index 7d6fb19860..a99b96bea5 100644
--- a/src/node/ext/byte_buffer.cc
+++ b/src/node/ext/byte_buffer.cc
@@ -45,6 +45,7 @@
namespace grpc {
namespace node {
+using Nan::Callback;
using Nan::MaybeLocal;
using v8::Function;
@@ -62,7 +63,11 @@ grpc_byte_buffer *BufferToByteBuffer(Local<Value> buffer) {
}
namespace {
-void delete_buffer(char *data, void *hint) { delete[] data; }
+void delete_buffer(char *data, void *hint) {
+ grpc_slice *slice = static_cast<grpc_slice *>(hint);
+ grpc_slice_unref(*slice);
+ delete slice;
+}
}
Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
@@ -75,31 +80,15 @@ Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
Nan::ThrowError("Error initializing byte buffer reader.");
return scope.Escape(Nan::Undefined());
}
- grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
- size_t length = GRPC_SLICE_LENGTH(slice);
- char *result = new char[length];
- memcpy(result, GRPC_SLICE_START_PTR(slice), length);
- grpc_slice_unref(slice);
- return scope.Escape(MakeFastBuffer(
- Nan::NewBuffer(result, length, delete_buffer, NULL).ToLocalChecked()));
+ grpc_slice *slice = new grpc_slice;
+ *slice = grpc_byte_buffer_reader_readall(&reader);
+ grpc_byte_buffer_reader_destroy(&reader);
+ char *result = reinterpret_cast<char *>(GRPC_SLICE_START_PTR(*slice));
+ size_t length = GRPC_SLICE_LENGTH(*slice);
+ Local<Value> buf =
+ Nan::NewBuffer(result, length, delete_buffer, slice).ToLocalChecked();
+ return scope.Escape(buf);
}
-Local<Value> MakeFastBuffer(Local<Value> slowBuffer) {
- Nan::EscapableHandleScope scope;
- Local<Object> globalObj = Nan::GetCurrentContext()->Global();
- MaybeLocal<Value> constructorValue = Nan::Get(
- globalObj, Nan::New("Buffer").ToLocalChecked());
- Local<Function> bufferConstructor = Local<Function>::Cast(
- constructorValue.ToLocalChecked());
- const int argc = 3;
- Local<Value> consArgs[argc] = {
- slowBuffer,
- Nan::New<Number>(::node::Buffer::Length(slowBuffer)),
- Nan::New<Number>(0)
- };
- MaybeLocal<Object> fastBuffer = Nan::NewInstance(bufferConstructor,
- argc, consArgs);
- return scope.Escape(fastBuffer.ToLocalChecked());
-}
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/byte_buffer.h b/src/node/ext/byte_buffer.h
index 55bc0ab377..e8c4ac90bd 100644
--- a/src/node/ext/byte_buffer.h
+++ b/src/node/ext/byte_buffer.h
@@ -50,10 +50,6 @@ grpc_byte_buffer *BufferToByteBuffer(v8::Local<v8::Value> buffer);
/* Convert a grpc_byte_buffer to a Node.js Buffer */
v8::Local<v8::Value> ByteBufferToBuffer(grpc_byte_buffer *buffer);
-/* Convert a ::node::Buffer to a fast Buffer, as defined in the Node
- Buffer documentation */
-v8::Local<v8::Value> MakeFastBuffer(v8::Local<v8::Value> slowBuffer);
-
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/slice.cc b/src/node/ext/slice.cc
index 98a80b3d2f..104dd9e22c 100644
--- a/src/node/ext/slice.cc
+++ b/src/node/ext/slice.cc
@@ -37,7 +37,6 @@
#include <grpc/support/alloc.h>
#include "slice.h"
-#include "byte_buffer.h"
namespace grpc {
namespace node {
@@ -93,9 +92,9 @@ Local<Value> CreateBufferFromSlice(const grpc_slice slice) {
Nan::EscapableHandleScope scope;
grpc_slice *slice_ptr = new grpc_slice;
*slice_ptr = grpc_slice_ref(slice);
- return scope.Escape(MakeFastBuffer(Nan::NewBuffer(
+ return scope.Escape(Nan::NewBuffer(
const_cast<char *>(reinterpret_cast<const char *>(GRPC_SLICE_START_PTR(*slice_ptr))),
- GRPC_SLICE_LENGTH(*slice_ptr), SliceFreeCallback, slice_ptr).ToLocalChecked()));
+ GRPC_SLICE_LENGTH(*slice_ptr), SliceFreeCallback, slice_ptr).ToLocalChecked());
}
} // namespace node
diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js
index 6abde2e17a..7158af775a 100644
--- a/src/node/performance/benchmark_server.js
+++ b/src/node/performance/benchmark_server.js
@@ -88,6 +88,13 @@ function streamingCall(call) {
});
}
+function makeUnaryGenericCall(response_size) {
+ var response = zeroBuffer(response_size);
+ return function unaryGenericCall(call, callback) {
+ callback(null, response);
+ };
+}
+
function makeStreamingGenericCall(response_size) {
var response = zeroBuffer(response_size);
return function streamingGenericCall(call) {
@@ -129,6 +136,7 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
this.port = server.bind(host + ':' + port, server_creds);
if (generic) {
server.addService(genericService, {
+ unaryCall: makeUnaryGenericCall(response_size),
streamingCall: makeStreamingGenericCall(response_size)
});
} else {
diff --git a/src/node/performance/generic_service.js b/src/node/performance/generic_service.js
index ce09cc4336..c936ac30bc 100644
--- a/src/node/performance/generic_service.js
+++ b/src/node/performance/generic_service.js
@@ -34,8 +34,17 @@
var _ = require('lodash');
module.exports = {
+ 'unaryCall' : {
+ path: '/grpc.testing.BenchmarkService/UnaryCall',
+ requestStream: false,
+ responseStream: false,
+ requestSerialize: _.identity,
+ requestDeserialize: _.identity,
+ responseSerialize: _.identity,
+ responseDeserialize: _.identity
+ },
'streamingCall' : {
- path: '/grpc.testing/BenchmarkService',
+ path: '/grpc.testing.BenchmarkService/StreamingCall',
requestStream: true,
responseStream: true,
requestSerialize: _.identity,
diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js
index 38888a7219..fa864f9925 100644
--- a/src/node/performance/worker_service_impl.js
+++ b/src/node/performance/worker_service_impl.js
@@ -89,6 +89,7 @@ module.exports = function WorkerServiceImpl(benchmark_impl, server) {
default:
call.emit('error', new Error('Unsupported PayloadConfig type' +
setup.payload_config.payload));
+ return;
}
switch (setup.load_params.load) {
case 'closed_loop':
@@ -103,6 +104,7 @@ module.exports = function WorkerServiceImpl(benchmark_impl, server) {
default:
call.emit('error', new Error('Unsupported LoadParams type' +
setup.load_params.load));
+ return;
}
stats = client.mark();
call.write({
@@ -137,8 +139,27 @@ module.exports = function WorkerServiceImpl(benchmark_impl, server) {
switch (request.argtype) {
case 'setup':
console.log('ServerConfig %j', request.setup);
+ var setup = request.setup;
+ var resp_size, generic;
+ if (setup.payload_config) {
+ switch (setup.payload_config.payload) {
+ case 'bytebuf_params':
+ resp_size = setup.payload_config.bytebuf_params.resp_size;
+ generic = true;
+ break;
+ case 'simple_params':
+ resp_size = setup.payload_config.simple_params.resp_size;
+ generic = false;
+ break;
+ default:
+ call.emit('error', new Error('Unsupported PayloadConfig type' +
+ setup.payload_config.payload));
+ return;
+ }
+ }
server = new BenchmarkServer('[::]', request.setup.port,
- request.setup.security_params);
+ request.setup.security_params,
+ generic, resp_size);
server.on('started', function() {
stats = server.mark();
call.write({
diff --git a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py
index c8ad9668ac..c58cb3ecf1 100644
--- a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py
+++ b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py
@@ -143,12 +143,13 @@ class ReflectionServicer(reflection_pb2.ServerReflectionServicer):
.encode(),))
-def enable_server_reflection(service_names, server):
+def enable_server_reflection(service_names, server, pool=None):
"""Enables server reflection on a server.
Args:
service_names: Iterable of fully-qualified service names available.
server: grpc.Server to which reflection service will be added.
+ pool: DescriptorPool object to use (descriptor_pool.Default() if None).
"""
reflection_pb2_grpc.add_ServerReflectionServicer_to_server(
- ReflectionServicer(service_names), server)
+ ReflectionServicer(service_names), server, pool)