aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c6
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.h2
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c4
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c275
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c22
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h18
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c20
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c101
9 files changed, 326 insertions, 128 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index 2b226c1bf7..e645eda7e3 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -41,9 +41,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/connector.h"
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h
index f5d1025432..d55f6ed669 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.h
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h
@@ -34,7 +34,7 @@
#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
-#include "src/core/ext/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/connector.h"
grpc_connector* grpc_chttp2_connector_create();
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 067ac35a5a..9c8505ddfa 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -38,8 +38,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/surface/api_trace.h"
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index f0c241d68e..119adfade1 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -38,9 +38,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
-#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 26f9449f4b..e2816b0e04 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -69,13 +69,21 @@
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
-#define DEFAULT_CLIENT_KEEPALIVE_TIME_S INT_MAX
-#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_S 20
+#define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
+#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
+#define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */
+#define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
-
-static int g_default_client_keepalive_time_s = DEFAULT_CLIENT_KEEPALIVE_TIME_S;
-static int g_default_client_keepalive_timeout_s =
- DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_S;
+#define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
+
+static int g_default_client_keepalive_time_ms =
+ DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
+static int g_default_client_keepalive_timeout_ms =
+ DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
+static int g_default_server_keepalive_time_ms =
+ DEFAULT_SERVER_KEEPALIVE_TIME_MS;
+static int g_default_server_keepalive_timeout_ms =
+ DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
static bool g_default_keepalive_permit_without_calls =
DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
@@ -153,6 +161,8 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
+#define DEFAULT_MAX_PING_STRIKES 2
+#define DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
/** keepalive-relevant functions */
static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -351,19 +361,35 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
.max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
.min_time_between_pings =
gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN),
+ .max_ping_strikes = DEFAULT_MAX_PING_STRIKES,
+ .min_ping_interval_without_data = gpr_time_from_millis(
+ DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, GPR_TIMESPAN),
};
- /* client-side keepalive setting */
- t->keepalive_time =
- g_default_client_keepalive_time_s == INT_MAX
- ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(g_default_client_keepalive_time_s,
- GPR_TIMESPAN);
- t->keepalive_timeout =
- g_default_client_keepalive_timeout_s == INT_MAX
- ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(g_default_client_keepalive_timeout_s,
- GPR_TIMESPAN);
+ /* Keepalive setting */
+ if (t->is_client) {
+ t->keepalive_time =
+ g_default_client_keepalive_time_ms == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(g_default_client_keepalive_time_ms,
+ GPR_TIMESPAN);
+ t->keepalive_timeout =
+ g_default_client_keepalive_timeout_ms == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(g_default_client_keepalive_timeout_ms,
+ GPR_TIMESPAN);
+ } else {
+ t->keepalive_time =
+ g_default_server_keepalive_time_ms == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(g_default_server_keepalive_time_ms,
+ GPR_TIMESPAN);
+ t->keepalive_timeout =
+ g_default_server_keepalive_timeout_ms == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(g_default_server_keepalive_timeout_ms,
+ GPR_TIMESPAN);
+ }
t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls;
if (channel_args) {
@@ -397,6 +423,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&channel_args->args[i],
(grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX});
} else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
+ t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_PING_STRIKES, 0, INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) {
t->ping_policy.min_time_between_pings = gpr_time_from_millis(
grpc_channel_arg_get_integer(
@@ -404,6 +435,15 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0,
INT_MAX}),
GPR_TIMESPAN);
+ } else if (0 ==
+ strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_PING_INTERVAL_WITHOUT_DATA_MS)) {
+ t->ping_policy.min_ping_interval_without_data = gpr_time_from_millis(
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){
+ DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, 0, INT_MAX}),
+ GPR_TIMESPAN);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
@@ -414,23 +454,27 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->enable_bdp_probe = grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){1, 0, 1});
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_CLIENT_KEEPALIVE_TIME_S)) {
+ GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i],
- (grpc_integer_options){g_default_client_keepalive_time_s, 1,
- INT_MAX});
+ (grpc_integer_options){t->is_client
+ ? g_default_client_keepalive_time_ms
+ : g_default_server_keepalive_time_ms,
+ 1, INT_MAX});
t->keepalive_time = value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(value, GPR_TIMESPAN);
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S)) {
+ GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i],
- (grpc_integer_options){g_default_client_keepalive_timeout_s, 0,
- INT_MAX});
+ (grpc_integer_options){t->is_client
+ ? g_default_client_keepalive_timeout_ms
+ : g_default_server_keepalive_timeout_ms,
+ 0, INT_MAX});
t->keepalive_timeout = value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_seconds(value, GPR_TIMESPAN);
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
t->keepalive_permit_without_calls =
@@ -488,8 +532,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ping_policy.max_pings_without_data;
t->ping_state.is_delayed_ping_timer_set = false;
- /** Start client-side keepalive pings */
- if (t->is_client) {
+ t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+
+ /* Start keepalive pings */
+ if (gpr_time_cmp(t->keepalive_time, gpr_inf_future(GPR_TIMESPAN)) != 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
@@ -918,6 +965,26 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
// GRPC_CHTTP2_IF_TRACING(
// gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
t->seen_goaway = 1;
+
+ /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
+ * data equal to “too_many_pings”, it should log the occurrence at a log level
+ * that is enabled by default and double the configured KEEPALIVE_TIME used
+ * for new connections on that channel. */
+ if (t->is_client && goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
+ grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0) {
+ gpr_log(GPR_ERROR,
+ "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
+ "data equal to \"too_many_pings\"");
+ double current_keepalive_time_ms =
+ gpr_timespec_to_micros(t->keepalive_time) / 1000;
+ t->keepalive_time =
+ current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis((int64_t)(current_keepalive_time_ms *
+ KEEPALIVE_TIME_BACKOFF_MULTIPLIER),
+ GPR_TIMESPAN);
+ }
+
/* lie: use transient failure from the transport to indicate goaway has been
* received */
connectivity_state_set(
@@ -1140,20 +1207,23 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
- grpc_transport_stream_op *op = stream_op;
- grpc_chttp2_transport *t = op->handler_private.args[0];
- grpc_chttp2_stream *s = op->handler_private.args[1];
+ grpc_transport_stream_op_batch *op = stream_op;
+ grpc_chttp2_stream *s = op->handler_private.extra_arg;
+ grpc_transport_stream_op_batch_payload *op_payload = op->payload;
+ grpc_chttp2_transport *t = s->t;
if (grpc_http_trace) {
- char *str = grpc_transport_stream_op_string(op);
+ char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
if (op->send_initial_metadata) {
- log_metadata(op->send_initial_metadata, s->id, t->is_client, true);
+ log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
+ s->id, t->is_client, true);
}
if (op->send_trailing_metadata) {
- log_metadata(op->send_trailing_metadata, s->id, t->is_client, false);
+ log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
+ s->id, t->is_client, false);
}
}
@@ -1168,23 +1238,25 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
- if (op->collect_stats != NULL) {
+ if (op->collect_stats) {
GPR_ASSERT(s->collecting_stats == NULL);
- s->collecting_stats = op->collect_stats;
+ s->collecting_stats = op_payload->collect_stats.collect_stats;
on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
}
- if (op->cancel_error != GRPC_ERROR_NONE) {
- grpc_chttp2_cancel_stream(exec_ctx, t, s, op->cancel_error);
+ if (op->cancel_stream) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s,
+ op_payload->cancel_stream.cancel_error);
}
- if (op->send_initial_metadata != NULL) {
+ if (op->send_initial_metadata) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
- s->send_initial_metadata = op->send_initial_metadata;
+ s->send_initial_metadata =
+ op_payload->send_initial_metadata.send_initial_metadata;
const size_t metadata_size =
- grpc_metadata_batch_size(op->send_initial_metadata);
+ grpc_metadata_batch_size(s->send_initial_metadata);
const size_t metadata_peer_limit =
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
@@ -1205,7 +1277,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
- if (contains_non_ok_status(op->send_initial_metadata)) {
+ if (contains_non_ok_status(s->send_initial_metadata)) {
s->seen_error = true;
}
if (!s->write_closed) {
@@ -1225,8 +1297,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->id != 0);
grpc_chttp2_stream_write_type write_type =
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
- if (op->send_message != NULL &&
- (op->send_message->flags & GRPC_WRITE_BUFFER_HINT)) {
+ if (op->send_message &&
+ (op->payload->send_message.send_message->flags &
+ GRPC_WRITE_BUFFER_HINT)) {
write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
}
grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
@@ -1244,7 +1317,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->send_message != NULL) {
+ if (op->send_message) {
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
@@ -1258,14 +1331,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->fetching_send_message == NULL);
uint8_t *frame_hdr =
grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
- uint32_t flags = op->send_message->flags;
+ uint32_t flags = op_payload->send_message.send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
- size_t len = op->send_message->length;
+ size_t len = op_payload->send_message.send_message->length;
frame_hdr[1] = (uint8_t)(len >> 24);
frame_hdr[2] = (uint8_t)(len >> 16);
frame_hdr[3] = (uint8_t)(len >> 8);
frame_hdr[4] = (uint8_t)(len);
- s->fetching_send_message = op->send_message;
+ s->fetching_send_message = op_payload->send_message.send_message;
s->fetched_send_message_length = 0;
s->next_message_end_offset = s->flow_controlled_bytes_written +
(int64_t)s->flow_controlled_buffer.length +
@@ -1282,14 +1355,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->send_trailing_metadata != NULL) {
+ if (op->send_trailing_metadata) {
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
- s->send_trailing_metadata = op->send_trailing_metadata;
+ s->send_trailing_metadata =
+ op_payload->send_trailing_metadata.send_trailing_metadata;
s->write_buffering = false;
const size_t metadata_size =
- grpc_metadata_batch_size(op->send_trailing_metadata);
+ grpc_metadata_batch_size(s->send_trailing_metadata);
const size_t metadata_peer_limit =
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
@@ -1306,14 +1380,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
- if (contains_non_ok_status(op->send_trailing_metadata)) {
+ if (contains_non_ok_status(s->send_trailing_metadata)) {
s->seen_error = true;
}
if (s->write_closed) {
s->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_trailing_metadata_finished,
- grpc_metadata_batch_is_empty(op->send_trailing_metadata)
+ grpc_metadata_batch_is_empty(
+ op->payload->send_trailing_metadata.send_trailing_metadata)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to send trailing metadata after "
@@ -1329,17 +1404,19 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
}
- if (op->recv_initial_metadata != NULL) {
+ if (op->recv_initial_metadata) {
GPR_ASSERT(s->recv_initial_metadata_ready == NULL);
- s->recv_initial_metadata_ready = op->recv_initial_metadata_ready;
- s->recv_initial_metadata = op->recv_initial_metadata;
+ s->recv_initial_metadata_ready =
+ op_payload->recv_initial_metadata.recv_initial_metadata_ready;
+ s->recv_initial_metadata =
+ op_payload->recv_initial_metadata.recv_initial_metadata;
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
}
- if (op->recv_message != NULL) {
+ if (op->recv_message) {
GPR_ASSERT(s->recv_message_ready == NULL);
- s->recv_message_ready = op->recv_message_ready;
- s->recv_message = op->recv_message;
+ s->recv_message_ready = op_payload->recv_message.recv_message_ready;
+ s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0 &&
(s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
@@ -1347,10 +1424,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
- if (op->recv_trailing_metadata != NULL) {
+ if (op->recv_trailing_metadata) {
GPR_ASSERT(s->recv_trailing_metadata_finished == NULL);
s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
- s->recv_trailing_metadata = op->recv_trailing_metadata;
+ s->recv_trailing_metadata =
+ op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
@@ -1363,19 +1441,19 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_stream *gs,
+ grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
if (grpc_http_trace) {
- char *str = grpc_transport_stream_op_string(op);
+ char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op[s=%p/%d]: %s", s, s->id, str);
gpr_free(str);
}
- op->handler_private.args[0] = gt;
- op->handler_private.args[1] = gs;
+ op->handler_private.extra_arg = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
grpc_closure_sched(
exec_ctx,
@@ -1448,11 +1526,26 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
+void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ gpr_log(GPR_DEBUG, "PING strike");
+ if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
+ t->ping_policy.max_ping_strikes != 0) {
+ send_goaway(exec_ctx, t,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
+ GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
+ /*The transport will be closed after the write is done */
+ close_transport_locked(
+ exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"));
+ }
+}
+
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
grpc_transport_op *op = stream_op;
- grpc_chttp2_transport *t = op->transport_private.args[0];
+ grpc_chttp2_transport *t = op->handler_private.extra_arg;
grpc_error *close_transport = op->disconnect_with_error;
if (op->on_connectivity_state_change != NULL) {
@@ -1498,10 +1591,10 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
char *msg = grpc_transport_op_string(op);
gpr_free(msg);
- op->transport_private.args[0] = gt;
+ op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
grpc_closure_sched(
- exec_ctx, grpc_closure_init(&op->transport_private.closure,
+ exec_ctx, grpc_closure_init(&op->handler_private.closure,
perform_transport_op_locked, op,
grpc_combiner_scheduler(t->combiner, false)),
GRPC_ERROR_NONE);
@@ -2114,6 +2207,10 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
}
+ /* Reset the keepalive ping timer */
+ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
+ }
grpc_bdp_estimator_start_ping(&t->bdp_estimator);
}
@@ -2128,20 +2225,32 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}
-void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args) {
+void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
+ bool is_client) {
size_t i;
if (args) {
for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_CLIENT_KEEPALIVE_TIME_S)) {
- g_default_client_keepalive_time_s = grpc_channel_arg_get_integer(
- &args->args[i], (grpc_integer_options){
- g_default_client_keepalive_time_s, 1, INT_MAX});
- } else if (0 == strcmp(args->args[i].key,
- GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S)) {
- g_default_client_keepalive_timeout_s = grpc_channel_arg_get_integer(
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){g_default_client_keepalive_time_ms, 1,
+ INT_MAX});
+ if (is_client) {
+ g_default_client_keepalive_time_ms = value;
+ } else {
+ g_default_server_keepalive_time_ms = value;
+ }
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
+ const int value = grpc_channel_arg_get_integer(
&args->args[i],
- (grpc_integer_options){g_default_client_keepalive_timeout_s, 0,
+ (grpc_integer_options){g_default_client_keepalive_timeout_ms, 0,
INT_MAX});
+ if (is_client) {
+ g_default_client_keepalive_timeout_ms = value;
+ } else {
+ g_default_server_keepalive_timeout_ms = value;
+ }
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
g_default_keepalive_permit_without_calls =
@@ -2159,7 +2268,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport *t = arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) {
- if (t->keepalive_permit_without_calls || t->stream_map.count > 0) {
+ if (t->keepalive_permit_without_calls ||
+ grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE,
@@ -2172,6 +2282,13 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
&t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
}
+ } else if (error == GRPC_ERROR_CANCELLED && !(t->destroying || t->closed)) {
+ /* The keepalive ping timer may be cancelled by bdp */
+ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
+ grpc_timer_init(
+ exec_ctx, &t->keepalive_ping_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
+ &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping");
}
@@ -2213,8 +2330,8 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
"keepalive watchdog timeout"));
}
} else {
- /** The watchdog timer should have been cancelled by
- finish_keepalive_ping_locked. */
+ /* The watchdog timer should have been cancelled by
+ * finish_keepalive_ping_locked. */
if (error != GRPC_ERROR_CANCELLED) {
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index 46dafdb62f..6016e43127 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -103,6 +103,28 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
+ if (!t->is_client) {
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec next_allowed_ping =
+ gpr_time_add(t->ping_recv_state.last_ping_recv_time,
+ t->ping_policy.min_ping_interval_without_data);
+
+ if (t->keepalive_permit_without_calls == 0 &&
+ grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ /* According to RFC1122, the interval of TCP Keep-Alive is default to
+ no less than two hours. When there is no outstanding streams, we
+ restrict the number of PINGS equivalent to TCP Keep-Alive. */
+ next_allowed_ping =
+ gpr_time_add(t->ping_recv_state.last_ping_recv_time,
+ gpr_time_from_seconds(7200, GPR_TIMESPAN));
+ }
+
+ if (gpr_time_cmp(next_allowed_ping, now) > 0) {
+ grpc_chttp2_add_ping_strike(exec_ctx, t);
+ }
+
+ t->ping_recv_state.last_ping_recv_time = now;
+ }
if (!g_disable_ping_ack) {
if (t->ping_ack_count == t->ping_ack_capacity) {
t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 50993e4ecd..6eb848b8d7 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -97,6 +97,8 @@ typedef struct {
typedef struct {
gpr_timespec min_time_between_pings;
int max_pings_without_data;
+ int max_ping_strikes;
+ gpr_timespec min_ping_interval_without_data;
} grpc_chttp2_repeated_ping_policy;
typedef struct {
@@ -106,6 +108,11 @@ typedef struct {
bool is_delayed_ping_timer_set;
} grpc_chttp2_repeated_ping_state;
+typedef struct {
+ gpr_timespec last_ping_recv_time;
+ int ping_strikes;
+} grpc_chttp2_server_ping_recv_state;
+
/* deframer state for the overall http2 stream of bytes */
typedef enum {
/* prefix: one entry per http2 connection prefix byte */
@@ -316,6 +323,7 @@ struct grpc_chttp2_transport {
size_t ping_ack_count;
size_t ping_ack_capacity;
uint64_t *ping_acks;
+ grpc_chttp2_server_ping_recv_state ping_recv_state;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
@@ -792,6 +800,13 @@ void grpc_chttp2_incoming_byte_stream_finished(
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint64_t id);
+/** Add a new ping strike to ping_recv_state.ping_strikes. If
+ ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
+ with error code ENHANCE_YOUR_CALM and additional debug data resembling
+ “too_many_pings” followed by immediately closing the connection. */
+void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+
typedef enum {
/* don't initiate a transport write, but piggyback on the next one */
GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
@@ -831,6 +846,7 @@ uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t);
/** Set the default keepalive configurations, must only be called at
initialization */
-void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args);
+void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
+ bool is_client);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 0869056f56..be41b3d186 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -229,6 +229,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
now_writing = true;
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
}
/* send any window updates */
if (s->announce_window > 0) {
@@ -238,6 +243,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
s->id, s->announce_window, &s->stats.outgoing));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
}
if (sent_initial_metadata) {
@@ -270,6 +280,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
send_bytes);
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = true;
@@ -345,6 +360,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
0, announced, &throwaway_stats));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
}
for (size_t i = 0; i < t->ping_ack_count; i++) {
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 952690eba7..0b9189558f 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -172,7 +172,7 @@ struct op_state {
};
struct op_and_state {
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
struct op_state state;
bool done;
struct stream_obj *s; /* Pointer back to the stream object */
@@ -187,7 +187,7 @@ struct op_storage {
struct stream_obj {
gpr_arena *arena;
struct op_and_state *oas;
- grpc_transport_stream_op *curr_op;
+ grpc_transport_stream_op_batch *curr_op;
grpc_cronet_transport *curr_ct;
grpc_stream *curr_gs;
bidirectional_stream *cbs;
@@ -298,12 +298,13 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) {
/*
Add a new stream op to op storage.
*/
-static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
+static void add_to_storage(struct stream_obj *s,
+ grpc_transport_stream_op_batch *op) {
struct op_storage *storage = &s->storage;
/* add new op at the beginning of the linked list. The memory is freed
in remove_from_storage */
struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state));
- memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op));
+ memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch));
memset(&new_op->state, 0, sizeof(new_op->state));
new_op->s = s;
new_op->done = false;
@@ -768,7 +769,7 @@ static bool header_has_authority(grpc_linked_mdelem *head) {
Op Execution: Decide if one of the actions contained in the stream op can be
executed. This is the heart of the state machine.
*/
-static bool op_can_be_run(grpc_transport_stream_op *curr_op,
+static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
struct stream_obj *s, struct op_state *op_state,
enum e_op_id op_id) {
struct op_state *stream_state = &s->state;
@@ -919,7 +920,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
*/
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas) {
- grpc_transport_stream_op *stream_op = &oas->op;
+ grpc_transport_stream_op_batch *stream_op = &oas->op;
struct stream_obj *s = oas->s;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
struct op_state *stream_state = &s->state;
@@ -941,9 +942,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
char *url = NULL;
const char *method = "POST";
s->header_array.headers = NULL;
- convert_metadata_to_cronet_headers(
- stream_op->send_initial_metadata->list.head, t->host, &url,
- &s->header_array.headers, &s->header_array.count, &method);
+ convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata
+ .send_initial_metadata->list.head,
+ t->host, &url, &s->header_array.headers,
+ &s->header_array.count, &method);
s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
@@ -971,8 +973,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
- grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
- stream_op->send_message->length, NULL);
+ grpc_byte_stream_next(
+ NULL, stream_op->payload->send_message.send_message, &slice,
+ stream_op->payload->send_message.send_message->length, NULL);
grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */
@@ -982,7 +985,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
- &write_buffer_size, stream_op->send_message->flags);
+ &write_buffer_size,
+ stream_op->payload->send_message.send_message->flags);
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
@@ -1029,20 +1033,28 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
} else if (stream_state->state_callback_received[OP_FAILED]) {
- grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
} else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
- grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &oas->s->state.rs.initial_metadata,
- stream_op->recv_initial_metadata);
- grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata);
+ grpc_closure_sched(
+ exec_ctx,
+ stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -1051,27 +1063,31 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1110,8 +1126,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx, stream_op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1161,9 +1178,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (stream_state->rs.compressed) {
stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS;
}
- *((grpc_byte_buffer **)stream_op->recv_message) =
+ *((grpc_byte_buffer **)stream_op->payload->recv_message.recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ grpc_closure_sched(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1187,12 +1205,12 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &oas->s->state.rs.trailing_metadata,
- stream_op->recv_trailing_metadata);
+ stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
- } else if (stream_op->cancel_error &&
+ } else if (stream_op->cancel_stream &&
op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
@@ -1204,7 +1222,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
stream_state->state_op_done[OP_CANCEL_ERROR] = true;
if (!stream_state->cancel_error) {
- stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error);
+ stream_state->cancel_error =
+ GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
}
} else if (stream_op->on_complete &&
op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
@@ -1283,18 +1302,22 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set) {}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_stream *gs,
+ grpc_transport_stream_op_batch *op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
if (op->send_initial_metadata &&
- header_has_authority(op->send_initial_metadata->list.head)) {
+ header_has_authority(op->payload->send_initial_metadata
+ .send_initial_metadata->list.head)) {
/* Cronet does not support :authority header field. We cancel the call when
this field is present in metadata */
- if (op->recv_initial_metadata_ready) {
- grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
- GRPC_ERROR_CANCELLED);
+ if (op->recv_initial_metadata) {
+ grpc_closure_sched(
+ exec_ctx,
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED);
}
- if (op->recv_message_ready) {
- grpc_closure_sched(exec_ctx, op->recv_message_ready,
+ if (op->recv_message) {
+ grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED);