diff options
Diffstat (limited to 'src/core/ext/transport')
9 files changed, 326 insertions, 128 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 2b226c1bf7..e645eda7e3 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -41,9 +41,9 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -#include "src/core/ext/client_channel/connector.h" -#include "src/core/ext/client_channel/http_connect_handshaker.h" -#include "src/core/ext/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/connector.h" +#include "src/core/ext/filters/client_channel/http_connect_handshaker.h" +#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h index f5d1025432..d55f6ed669 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.h +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -34,7 +34,7 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H -#include "src/core/ext/client_channel/connector.h" +#include "src/core/ext/filters/client_channel/connector.h" grpc_connector* grpc_chttp2_connector_create(); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 067ac35a5a..9c8505ddfa 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -38,8 +38,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -#include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/surface/api_trace.h" diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index f0c241d68e..119adfade1 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -38,9 +38,9 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -#include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/resolver_registry.h" -#include "src/core/ext/client_channel/uri_parser.h" +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/sockaddr_utils.h" diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 26f9449f4b..e2816b0e04 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -69,13 +69,21 @@ #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) -#define DEFAULT_CLIENT_KEEPALIVE_TIME_S INT_MAX -#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_S 20 +#define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX +#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */ +#define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */ +#define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */ #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false - -static int g_default_client_keepalive_time_s = DEFAULT_CLIENT_KEEPALIVE_TIME_S; -static int g_default_client_keepalive_timeout_s = - DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_S; +#define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2 + +static int g_default_client_keepalive_time_ms = + DEFAULT_CLIENT_KEEPALIVE_TIME_MS; +static int g_default_client_keepalive_timeout_ms = + DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS; +static int g_default_server_keepalive_time_ms = + DEFAULT_SERVER_KEEPALIVE_TIME_MS; +static int g_default_server_keepalive_timeout_ms = + DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS; static bool g_default_keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; @@ -153,6 +161,8 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, #define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 +#define DEFAULT_MAX_PING_STRIKES 2 +#define DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */ /** keepalive-relevant functions */ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -351,19 +361,35 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA, .min_time_between_pings = gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), + .max_ping_strikes = DEFAULT_MAX_PING_STRIKES, + .min_ping_interval_without_data = gpr_time_from_millis( + DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, GPR_TIMESPAN), }; - /* client-side keepalive setting */ - t->keepalive_time = - g_default_client_keepalive_time_s == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_seconds(g_default_client_keepalive_time_s, - GPR_TIMESPAN); - t->keepalive_timeout = - g_default_client_keepalive_timeout_s == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_seconds(g_default_client_keepalive_timeout_s, - GPR_TIMESPAN); + /* Keepalive setting */ + if (t->is_client) { + t->keepalive_time = + g_default_client_keepalive_time_ms == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(g_default_client_keepalive_time_ms, + GPR_TIMESPAN); + t->keepalive_timeout = + g_default_client_keepalive_timeout_ms == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(g_default_client_keepalive_timeout_ms, + GPR_TIMESPAN); + } else { + t->keepalive_time = + g_default_server_keepalive_time_ms == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(g_default_server_keepalive_time_ms, + GPR_TIMESPAN); + t->keepalive_timeout = + g_default_server_keepalive_timeout_ms == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis(g_default_server_keepalive_timeout_ms, + GPR_TIMESPAN); + } t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; if (channel_args) { @@ -397,6 +423,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &channel_args->args[i], (grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX}); } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { + t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_MAX_PING_STRIKES, 0, INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) { t->ping_policy.min_time_between_pings = gpr_time_from_millis( grpc_channel_arg_get_integer( @@ -404,6 +435,15 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0, INT_MAX}), GPR_TIMESPAN); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_ping_interval_without_data = gpr_time_from_millis( + grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){ + DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, 0, INT_MAX}), + GPR_TIMESPAN); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer( @@ -414,23 +454,27 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->enable_bdp_probe = grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){1, 0, 1}); } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_CLIENT_KEEPALIVE_TIME_S)) { + GRPC_ARG_KEEPALIVE_TIME_MS)) { const int value = grpc_channel_arg_get_integer( &channel_args->args[i], - (grpc_integer_options){g_default_client_keepalive_time_s, 1, - INT_MAX}); + (grpc_integer_options){t->is_client + ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); t->keepalive_time = value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_seconds(value, GPR_TIMESPAN); + : gpr_time_from_millis(value, GPR_TIMESPAN); } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S)) { + GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { const int value = grpc_channel_arg_get_integer( &channel_args->args[i], - (grpc_integer_options){g_default_client_keepalive_timeout_s, 0, - INT_MAX}); + (grpc_integer_options){t->is_client + ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); t->keepalive_timeout = value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_seconds(value, GPR_TIMESPAN); + : gpr_time_from_millis(value, GPR_TIMESPAN); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { t->keepalive_permit_without_calls = @@ -488,8 +532,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ping_policy.max_pings_without_data; t->ping_state.is_delayed_ping_timer_set = false; - /** Start client-side keepalive pings */ - if (t->is_client) { + t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.ping_strikes = 0; + + /* Start keepalive pings */ + if (gpr_time_cmp(t->keepalive_time, gpr_inf_future(GPR_TIMESPAN)) != 0) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init( @@ -918,6 +965,26 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, // GRPC_CHTTP2_IF_TRACING( // gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg)); t->seen_goaway = 1; + + /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug + * data equal to “too_many_pings”, it should log the occurrence at a log level + * that is enabled by default and double the configured KEEPALIVE_TIME used + * for new connections on that channel. */ + if (t->is_client && goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM && + grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0) { + gpr_log(GPR_ERROR, + "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug " + "data equal to \"too_many_pings\""); + double current_keepalive_time_ms = + gpr_timespec_to_micros(t->keepalive_time) / 1000; + t->keepalive_time = + current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis((int64_t)(current_keepalive_time_ms * + KEEPALIVE_TIME_BACKOFF_MULTIPLIER), + GPR_TIMESPAN); + } + /* lie: use transient failure from the transport to indicate goaway has been * received */ connectivity_state_set( @@ -1140,20 +1207,23 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); - grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->handler_private.args[0]; - grpc_chttp2_stream *s = op->handler_private.args[1]; + grpc_transport_stream_op_batch *op = stream_op; + grpc_chttp2_stream *s = op->handler_private.extra_arg; + grpc_transport_stream_op_batch_payload *op_payload = op->payload; + grpc_chttp2_transport *t = s->t; if (grpc_http_trace) { - char *str = grpc_transport_stream_op_string(op); + char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); gpr_free(str); if (op->send_initial_metadata) { - log_metadata(op->send_initial_metadata, s->id, t->is_client, true); + log_metadata(op_payload->send_initial_metadata.send_initial_metadata, + s->id, t->is_client, true); } if (op->send_trailing_metadata) { - log_metadata(op->send_trailing_metadata, s->id, t->is_client, false); + log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata, + s->id, t->is_client, false); } } @@ -1168,23 +1238,25 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; on_complete->error_data.error = GRPC_ERROR_NONE; - if (op->collect_stats != NULL) { + if (op->collect_stats) { GPR_ASSERT(s->collecting_stats == NULL); - s->collecting_stats = op->collect_stats; + s->collecting_stats = op_payload->collect_stats.collect_stats; on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT; } - if (op->cancel_error != GRPC_ERROR_NONE) { - grpc_chttp2_cancel_stream(exec_ctx, t, s, op->cancel_error); + if (op->cancel_stream) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, + op_payload->cancel_stream.cancel_error); } - if (op->send_initial_metadata != NULL) { + if (op->send_initial_metadata) { GPR_ASSERT(s->send_initial_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_initial_metadata_finished = add_closure_barrier(on_complete); - s->send_initial_metadata = op->send_initial_metadata; + s->send_initial_metadata = + op_payload->send_initial_metadata.send_initial_metadata; const size_t metadata_size = - grpc_metadata_batch_size(op->send_initial_metadata); + grpc_metadata_batch_size(s->send_initial_metadata); const size_t metadata_peer_limit = t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; @@ -1205,7 +1277,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); } else { - if (contains_non_ok_status(op->send_initial_metadata)) { + if (contains_non_ok_status(s->send_initial_metadata)) { s->seen_error = true; } if (!s->write_closed) { @@ -1225,8 +1297,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_ASSERT(s->id != 0); grpc_chttp2_stream_write_type write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; - if (op->send_message != NULL && - (op->send_message->flags & GRPC_WRITE_BUFFER_HINT)) { + if (op->send_message && + (op->payload->send_message.send_message->flags & + GRPC_WRITE_BUFFER_HINT)) { write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; } grpc_chttp2_become_writable(exec_ctx, t, s, write_type, @@ -1244,7 +1317,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } - if (op->send_message != NULL) { + if (op->send_message) { on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { @@ -1258,14 +1331,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_ASSERT(s->fetching_send_message == NULL); uint8_t *frame_hdr = grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); - uint32_t flags = op->send_message->flags; + uint32_t flags = op_payload->send_message.send_message->flags; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; - size_t len = op->send_message->length; + size_t len = op_payload->send_message.send_message->length; frame_hdr[1] = (uint8_t)(len >> 24); frame_hdr[2] = (uint8_t)(len >> 16); frame_hdr[3] = (uint8_t)(len >> 8); frame_hdr[4] = (uint8_t)(len); - s->fetching_send_message = op->send_message; + s->fetching_send_message = op_payload->send_message.send_message; s->fetched_send_message_length = 0; s->next_message_end_offset = s->flow_controlled_bytes_written + (int64_t)s->flow_controlled_buffer.length + @@ -1282,14 +1355,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } - if (op->send_trailing_metadata != NULL) { + if (op->send_trailing_metadata) { GPR_ASSERT(s->send_trailing_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); - s->send_trailing_metadata = op->send_trailing_metadata; + s->send_trailing_metadata = + op_payload->send_trailing_metadata.send_trailing_metadata; s->write_buffering = false; const size_t metadata_size = - grpc_metadata_batch_size(op->send_trailing_metadata); + grpc_metadata_batch_size(s->send_trailing_metadata); const size_t metadata_peer_limit = t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; @@ -1306,14 +1380,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); } else { - if (contains_non_ok_status(op->send_trailing_metadata)) { + if (contains_non_ok_status(s->send_trailing_metadata)) { s->seen_error = true; } if (s->write_closed) { s->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_trailing_metadata_finished, - grpc_metadata_batch_is_empty(op->send_trailing_metadata) + grpc_metadata_batch_is_empty( + op->payload->send_trailing_metadata.send_trailing_metadata) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Attempt to send trailing metadata after " @@ -1329,17 +1404,19 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } - if (op->recv_initial_metadata != NULL) { + if (op->recv_initial_metadata) { GPR_ASSERT(s->recv_initial_metadata_ready == NULL); - s->recv_initial_metadata_ready = op->recv_initial_metadata_ready; - s->recv_initial_metadata = op->recv_initial_metadata; + s->recv_initial_metadata_ready = + op_payload->recv_initial_metadata.recv_initial_metadata_ready; + s->recv_initial_metadata = + op_payload->recv_initial_metadata.recv_initial_metadata; grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); } - if (op->recv_message != NULL) { + if (op->recv_message) { GPR_ASSERT(s->recv_message_ready == NULL); - s->recv_message_ready = op->recv_message_ready; - s->recv_message = op->recv_message; + s->recv_message_ready = op_payload->recv_message.recv_message_ready; + s->recv_message = op_payload->recv_message.recv_message; if (s->id != 0 && (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) { incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); @@ -1347,10 +1424,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } - if (op->recv_trailing_metadata != NULL) { + if (op->recv_trailing_metadata) { GPR_ASSERT(s->recv_trailing_metadata_finished == NULL); s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); - s->recv_trailing_metadata = op->recv_trailing_metadata; + s->recv_trailing_metadata = + op_payload->recv_trailing_metadata.recv_trailing_metadata; s->final_metadata_requested = true; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } @@ -1363,19 +1441,19 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_stream *gs, + grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("perform_stream_op", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; if (grpc_http_trace) { - char *str = grpc_transport_stream_op_string(op); + char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op[s=%p/%d]: %s", s, s->id, str); gpr_free(str); } - op->handler_private.args[0] = gt; - op->handler_private.args[1] = gs; + op->handler_private.extra_arg = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, @@ -1448,11 +1526,26 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } +void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + gpr_log(GPR_DEBUG, "PING strike"); + if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes && + t->ping_policy.max_ping_strikes != 0) { + send_goaway(exec_ctx, t, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); + /*The transport will be closed after the write is done */ + close_transport_locked( + exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings")); + } +} + static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { grpc_transport_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; + grpc_chttp2_transport *t = op->handler_private.extra_arg; grpc_error *close_transport = op->disconnect_with_error; if (op->on_connectivity_state_change != NULL) { @@ -1498,10 +1591,10 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; char *msg = grpc_transport_op_string(op); gpr_free(msg); - op->transport_private.args[0] = gt; + op->handler_private.extra_arg = gt; GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); grpc_closure_sched( - exec_ctx, grpc_closure_init(&op->transport_private.closure, + exec_ctx, grpc_closure_init(&op->handler_private.closure, perform_transport_op_locked, op, grpc_combiner_scheduler(t->combiner, false)), GRPC_ERROR_NONE); @@ -2114,6 +2207,10 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, if (grpc_http_trace) { gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string); } + /* Reset the keepalive ping timer */ + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + } grpc_bdp_estimator_start_ping(&t->bdp_estimator); } @@ -2128,20 +2225,32 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } -void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args) { +void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, + bool is_client) { size_t i; if (args) { for (i = 0; i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_CLIENT_KEEPALIVE_TIME_S)) { - g_default_client_keepalive_time_s = grpc_channel_arg_get_integer( - &args->args[i], (grpc_integer_options){ - g_default_client_keepalive_time_s, 1, INT_MAX}); - } else if (0 == strcmp(args->args[i].key, - GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S)) { - g_default_client_keepalive_timeout_s = grpc_channel_arg_get_integer( + if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { + const int value = grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){g_default_client_keepalive_time_ms, 1, + INT_MAX}); + if (is_client) { + g_default_client_keepalive_time_ms = value; + } else { + g_default_server_keepalive_time_ms = value; + } + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { + const int value = grpc_channel_arg_get_integer( &args->args[i], - (grpc_integer_options){g_default_client_keepalive_timeout_s, 0, + (grpc_integer_options){g_default_client_keepalive_timeout_ms, 0, INT_MAX}); + if (is_client) { + g_default_client_keepalive_timeout_ms = value; + } else { + g_default_server_keepalive_timeout_ms = value; + } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { g_default_keepalive_permit_without_calls = @@ -2159,7 +2268,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport *t = arg; GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) { - if (t->keepalive_permit_without_calls || t->stream_map.count > 0) { + if (t->keepalive_permit_without_calls || + grpc_chttp2_stream_map_size(&t->stream_map) > 0) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, @@ -2172,6 +2282,13 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); } + } else if (error == GRPC_ERROR_CANCELLED && !(t->destroying || t->closed)) { + /* The keepalive ping timer may be cancelled by bdp */ + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); } @@ -2213,8 +2330,8 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, "keepalive watchdog timeout")); } } else { - /** The watchdog timer should have been cancelled by - finish_keepalive_ping_locked. */ + /* The watchdog timer should have been cancelled by + * finish_keepalive_ping_locked. */ if (error != GRPC_ERROR_CANCELLED) { gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 46dafdb62f..6016e43127 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -103,6 +103,28 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, if (p->is_ack) { grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes); } else { + if (!t->is_client) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec next_allowed_ping = + gpr_time_add(t->ping_recv_state.last_ping_recv_time, + t->ping_policy.min_ping_interval_without_data); + + if (t->keepalive_permit_without_calls == 0 && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + /* According to RFC1122, the interval of TCP Keep-Alive is default to + no less than two hours. When there is no outstanding streams, we + restrict the number of PINGS equivalent to TCP Keep-Alive. */ + next_allowed_ping = + gpr_time_add(t->ping_recv_state.last_ping_recv_time, + gpr_time_from_seconds(7200, GPR_TIMESPAN)); + } + + if (gpr_time_cmp(next_allowed_ping, now) > 0) { + grpc_chttp2_add_ping_strike(exec_ctx, t); + } + + t->ping_recv_state.last_ping_recv_time = now; + } if (!g_disable_ping_ack) { if (t->ping_ack_count == t->ping_ack_capacity) { t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 50993e4ecd..6eb848b8d7 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -97,6 +97,8 @@ typedef struct { typedef struct { gpr_timespec min_time_between_pings; int max_pings_without_data; + int max_ping_strikes; + gpr_timespec min_ping_interval_without_data; } grpc_chttp2_repeated_ping_policy; typedef struct { @@ -106,6 +108,11 @@ typedef struct { bool is_delayed_ping_timer_set; } grpc_chttp2_repeated_ping_state; +typedef struct { + gpr_timespec last_ping_recv_time; + int ping_strikes; +} grpc_chttp2_server_ping_recv_state; + /* deframer state for the overall http2 stream of bytes */ typedef enum { /* prefix: one entry per http2 connection prefix byte */ @@ -316,6 +323,7 @@ struct grpc_chttp2_transport { size_t ping_ack_count; size_t ping_ack_capacity; uint64_t *ping_acks; + grpc_chttp2_server_ping_recv_state ping_recv_state; /** parser for headers */ grpc_chttp2_hpack_parser hpack_parser; @@ -792,6 +800,13 @@ void grpc_chttp2_incoming_byte_stream_finished( void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint64_t id); +/** Add a new ping strike to ping_recv_state.ping_strikes. If + ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY + with error code ENHANCE_YOUR_CALM and additional debug data resembling + “too_many_pings” followed by immediately closing the connection. */ +void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); + typedef enum { /* don't initiate a transport write, but piggyback on the next one */ GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK, @@ -831,6 +846,7 @@ uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t); /** Set the default keepalive configurations, must only be called at initialization */ -void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args); +void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, + bool is_client); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 0869056f56..be41b3d186 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -229,6 +229,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, now_writing = true; t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; + if (!t->is_client) { + t->ping_recv_state.last_ping_recv_time = + gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.ping_strikes = 0; + } } /* send any window updates */ if (s->announce_window > 0) { @@ -238,6 +243,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, s->id, s->announce_window, &s->stats.outgoing)); t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; + if (!t->is_client) { + t->ping_recv_state.last_ping_recv_time = + gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.ping_strikes = 0; + } GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce); } if (sent_initial_metadata) { @@ -270,6 +280,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, send_bytes); t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; + if (!t->is_client) { + t->ping_recv_state.last_ping_recv_time = + gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.ping_strikes = 0; + } if (is_last_frame) { s->send_trailing_metadata = NULL; s->sent_trailing_metadata = true; @@ -345,6 +360,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, 0, announced, &throwaway_stats)); t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; + if (!t->is_client) { + t->ping_recv_state.last_ping_recv_time = + gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.ping_strikes = 0; + } } for (size_t i = 0; i < t->ping_ack_count; i++) { diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 952690eba7..0b9189558f 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -172,7 +172,7 @@ struct op_state { }; struct op_and_state { - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; struct op_state state; bool done; struct stream_obj *s; /* Pointer back to the stream object */ @@ -187,7 +187,7 @@ struct op_storage { struct stream_obj { gpr_arena *arena; struct op_and_state *oas; - grpc_transport_stream_op *curr_op; + grpc_transport_stream_op_batch *curr_op; grpc_cronet_transport *curr_ct; grpc_stream *curr_gs; bidirectional_stream *cbs; @@ -298,12 +298,13 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) { /* Add a new stream op to op storage. */ -static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { +static void add_to_storage(struct stream_obj *s, + grpc_transport_stream_op_batch *op) { struct op_storage *storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state)); - memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op)); + memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch)); memset(&new_op->state, 0, sizeof(new_op->state)); new_op->s = s; new_op->done = false; @@ -768,7 +769,7 @@ static bool header_has_authority(grpc_linked_mdelem *head) { Op Execution: Decide if one of the actions contained in the stream op can be executed. This is the heart of the state machine. */ -static bool op_can_be_run(grpc_transport_stream_op *curr_op, +static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, struct stream_obj *s, struct op_state *op_state, enum e_op_id op_id) { struct op_state *stream_state = &s->state; @@ -919,7 +920,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, */ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas) { - grpc_transport_stream_op *stream_op = &oas->op; + grpc_transport_stream_op_batch *stream_op = &oas->op; struct stream_obj *s = oas->s; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; struct op_state *stream_state = &s->state; @@ -941,9 +942,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; - convert_metadata_to_cronet_headers( - stream_op->send_initial_metadata->list.head, t->host, &url, - &s->header_array.headers, &s->header_array.count, &method); + convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata + .send_initial_metadata->list.head, + t->host, &url, &s->header_array.headers, + &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); @@ -971,8 +973,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer write_slice_buffer; grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); - grpc_byte_stream_next(NULL, stream_op->send_message, &slice, - stream_op->send_message->length, NULL); + grpc_byte_stream_next( + NULL, stream_op->payload->send_message.send_message, &slice, + stream_op->payload->send_message.send_message->length, NULL); grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ @@ -982,7 +985,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (write_slice_buffer.count > 0) { size_t write_buffer_size; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size, stream_op->send_message->flags); + &write_buffer_size, + stream_op->payload->send_message.send_message->flags); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; @@ -1029,20 +1033,28 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, - stream_op->recv_initial_metadata); - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + stream_op->payload->recv_initial_metadata.recv_initial_metadata); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -1051,27 +1063,31 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->flush_read) { CRONET_LOG(GPR_DEBUG, "flush read"); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1110,8 +1126,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, stream_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1161,9 +1178,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (stream_state->rs.compressed) { stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS; } - *((grpc_byte_buffer **)stream_op->recv_message) = + *((grpc_byte_buffer **)stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1187,12 +1205,12 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.trailing_metadata, - stream_op->recv_trailing_metadata); + stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_state->rs.trailing_metadata_valid = false; } stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; - } else if (stream_op->cancel_error && + } else if (stream_op->cancel_stream && op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); @@ -1204,7 +1222,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } stream_state->state_op_done[OP_CANCEL_ERROR] = true; if (!stream_state->cancel_error) { - stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); + stream_state->cancel_error = + GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error); } } else if (stream_op->on_complete && op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { @@ -1283,18 +1302,22 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set) {} static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_stream *gs, + grpc_transport_stream_op_batch *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); if (op->send_initial_metadata && - header_has_authority(op->send_initial_metadata->list.head)) { + header_has_authority(op->payload->send_initial_metadata + .send_initial_metadata->list.head)) { /* Cronet does not support :authority header field. We cancel the call when this field is present in metadata */ - if (op->recv_initial_metadata_ready) { - grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED); + if (op->recv_initial_metadata) { + grpc_closure_sched( + exec_ctx, + op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); } - if (op->recv_message_ready) { - grpc_closure_sched(exec_ctx, op->recv_message_ready, + if (op->recv_message) { + grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, GRPC_ERROR_CANCELLED); } grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); |