diff options
Diffstat (limited to 'src/core')
39 files changed, 1034 insertions, 608 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index c3dca14305..b83c95275f 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state( 7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, cq, tag)); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); gpr_mu_init(&w->mu); GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index 52c6e38c87..568bb2ba8d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -88,7 +88,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, // Record call finished, optionally setting client_failed_to_send and // received. grpc_grpclb_client_stats_add_call_finished( - false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */, !calld->send_initial_metadata_succeeded /* client_failed_to_send */, calld->recv_initial_metadata_succeeded /* known_received */, calld->client_stats); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index ebce801b37..bb9217d843 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -416,9 +416,7 @@ struct rr_connectivity_data { static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, bool log) { - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { - return false; - } + if (server->drop) return false; const grpc_grpclb_ip_address *ip = &server->ip_address; if (server->port >> 16 != 0) { if (log) { @@ -462,7 +460,7 @@ static const grpc_lb_user_data_vtable lb_token_vtable = { static void parse_server(const grpc_grpclb_server *server, grpc_resolved_address *addr) { memset(addr, 0, sizeof(*addr)); - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return; + if (server->drop) return; const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ @@ -610,7 +608,7 @@ static bool pick_from_internal_rr_locked( if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { glb_policy->serverlist_index = 0; // Wrap-around. } - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + if (server->drop) { // Not using the RR policy, so unref it. if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", @@ -622,11 +620,8 @@ static bool pick_from_internal_rr_locked( // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. - grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats); - grpc_grpclb_client_stats_add_call_finished( - server->drop_for_rate_limiting, server->drop_for_load_balancing, - false /* failed_to_send */, false /* known_received */, - wc_arg->client_stats); + grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, + wc_arg->client_stats); grpc_grpclb_client_stats_unref(wc_arg->client_stats); if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); @@ -715,7 +710,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, return; } glb_policy->rr_policy = new_rr_policy; - grpc_error *rr_state_error = NULL; const grpc_connectivity_state rr_state = grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, @@ -741,7 +735,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, rr_connectivity->state = rr_state; /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -806,32 +800,31 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = arg; glb_lb_policy *glb_policy = rr_connectivity->glb_policy; - - const bool shutting_down = glb_policy->shutting_down; - bool unref_needed = false; - GRPC_ERROR_REF(error); - - if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) { - /* RR policy shutting down. Don't renew subscription and free the arg of - * this callback. In addition we need to stash away the current policy to - * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last - * one, the policy would be destroyed, alongside the lock, which would - * result in a use-after-free */ - unref_needed = true; + if (glb_policy->shutting_down) { + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "glb_rr_connectivity_cb"); gpr_free(rr_connectivity); - } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ - update_lb_connectivity_status_locked( - exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); - /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change_locked( - exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, - &rr_connectivity->on_change); + return; } - if (unref_needed) { + if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) { + /* An RR policy that has transitioned into the SHUTDOWN connectivity state + * should not be considered for picks or updates: the SHUTDOWN state is a + * sink, policies can't transition back from it. .*/ + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, + "rr_connectivity_shutdown"); + glb_policy->rr_policy = NULL; GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "rr_connectivity_cb"); + "glb_rr_connectivity_cb"); + gpr_free(rr_connectivity); + return; } - GRPC_ERROR_UNREF(error); + /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ + update_lb_connectivity_status_locked( + exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); + /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ + grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); } static void destroy_balancer_name(grpc_exec_ctx *exec_ctx, @@ -995,7 +988,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, gpr_free(glb_policy); return NULL; } - GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, grpc_combiner_scheduler(args->combiner)); @@ -1052,7 +1044,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_policy->pending_picks = NULL; pending_ping *pping = glb_policy->pending_pings; glb_policy->pending_pings = NULL; - if (glb_policy->rr_policy) { + if (glb_policy->rr_policy != NULL) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } // We destroy the LB channel here because @@ -1309,15 +1301,14 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, } static bool load_report_counters_are_zero(grpc_grpclb_request *request) { + grpc_grpclb_dropped_call_counts *drop_entries = + request->client_stats.calls_finished_with_drop.arg; return request->client_stats.num_calls_started == 0 && request->client_stats.num_calls_finished == 0 && - request->client_stats.num_calls_finished_with_drop_for_rate_limiting == - 0 && - request->client_stats - .num_calls_finished_with_drop_for_load_balancing == 0 && request->client_stats.num_calls_finished_with_client_failed_to_send == 0 && - request->client_stats.num_calls_finished_known_received == 0; + request->client_stats.num_calls_finished_known_received == 0 && + (drop_entries == NULL || drop_entries->num_entries == 0); } static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1332,7 +1323,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, // Construct message payload. GPR_ASSERT(glb_policy->client_load_report_payload == NULL); grpc_grpclb_request *request = - grpc_grpclb_load_report_request_create(glb_policy->client_stats); + grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. if (load_report_counters_are_zero(request)) { @@ -1778,7 +1769,8 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (!glb_policy->watching_lb_channel) { // Watch the LB channel connectivity for connection. - glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT; + glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state( + glb_policy->lb_channel, true /* try to connect */); grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(glb_policy->lb_channel)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c index c762443b7c..5b62623145 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c @@ -18,8 +18,11 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/atm.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> @@ -29,10 +32,11 @@ struct grpc_grpclb_client_stats { gpr_refcount refs; + // This field must only be accessed via *_locked() methods. + grpc_grpclb_dropped_call_counts* drop_token_counts; + // These fields may be accessed from multiple threads at a time. gpr_atm num_calls_started; gpr_atm num_calls_finished; - gpr_atm num_calls_finished_with_drop_for_rate_limiting; - gpr_atm num_calls_finished_with_drop_for_load_balancing; gpr_atm num_calls_finished_with_client_failed_to_send; gpr_atm num_calls_finished_known_received; }; @@ -51,6 +55,7 @@ grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) { if (gpr_unref(&client_stats->refs)) { + grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts); gpr_free(client_stats); } } @@ -61,21 +66,9 @@ void grpc_grpclb_client_stats_add_call_started( } void grpc_grpclb_client_stats_add_call_finished( - bool finished_with_drop_for_rate_limiting, - bool finished_with_drop_for_load_balancing, bool finished_with_client_failed_to_send, bool finished_known_received, grpc_grpclb_client_stats* client_stats) { gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); - if (finished_with_drop_for_rate_limiting) { - gpr_atm_full_fetch_add( - &client_stats->num_calls_finished_with_drop_for_rate_limiting, - (gpr_atm)1); - } - if (finished_with_drop_for_load_balancing) { - gpr_atm_full_fetch_add( - &client_stats->num_calls_finished_with_drop_for_load_balancing, - (gpr_atm)1); - } if (finished_with_client_failed_to_send) { gpr_atm_full_fetch_add( &client_stats->num_calls_finished_with_client_failed_to_send, @@ -87,32 +80,70 @@ void grpc_grpclb_client_stats_add_call_finished( } } +void grpc_grpclb_client_stats_add_call_dropped_locked( + char* token, grpc_grpclb_client_stats* client_stats) { + // Increment num_calls_started and num_calls_finished. + gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1); + gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); + // Record the drop. + if (client_stats->drop_token_counts == NULL) { + client_stats->drop_token_counts = + gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts)); + } + grpc_grpclb_dropped_call_counts* drop_token_counts = + client_stats->drop_token_counts; + for (size_t i = 0; i < drop_token_counts->num_entries; ++i) { + if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) { + ++drop_token_counts->token_counts[i].count; + return; + } + } + // Not found, so add a new entry. We double the size of the array each time. + size_t new_num_entries = 2; + while (new_num_entries < drop_token_counts->num_entries + 1) { + new_num_entries *= 2; + } + drop_token_counts->token_counts = + gpr_realloc(drop_token_counts->token_counts, + new_num_entries * sizeof(grpc_grpclb_drop_token_count)); + grpc_grpclb_drop_token_count* new_entry = + &drop_token_counts->token_counts[drop_token_counts->num_entries++]; + new_entry->token = gpr_strdup(token); + new_entry->count = 1; +} + static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) { *value = (int64_t)gpr_atm_acq_load(counter); gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value)); } -void grpc_grpclb_client_stats_get( +void grpc_grpclb_client_stats_get_locked( grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, int64_t* num_calls_finished, - int64_t* num_calls_finished_with_drop_for_rate_limiting, - int64_t* num_calls_finished_with_drop_for_load_balancing, int64_t* num_calls_finished_with_client_failed_to_send, - int64_t* num_calls_finished_known_received) { + int64_t* num_calls_finished_known_received, + grpc_grpclb_dropped_call_counts** drop_token_counts) { atomic_get_and_reset_counter(num_calls_started, &client_stats->num_calls_started); atomic_get_and_reset_counter(num_calls_finished, &client_stats->num_calls_finished); atomic_get_and_reset_counter( - num_calls_finished_with_drop_for_rate_limiting, - &client_stats->num_calls_finished_with_drop_for_rate_limiting); - atomic_get_and_reset_counter( - num_calls_finished_with_drop_for_load_balancing, - &client_stats->num_calls_finished_with_drop_for_load_balancing); - atomic_get_and_reset_counter( num_calls_finished_with_client_failed_to_send, &client_stats->num_calls_finished_with_client_failed_to_send); atomic_get_and_reset_counter( num_calls_finished_known_received, &client_stats->num_calls_finished_known_received); + *drop_token_counts = client_stats->drop_token_counts; + client_stats->drop_token_counts = NULL; +} + +void grpc_grpclb_dropped_call_counts_destroy( + grpc_grpclb_dropped_call_counts* drop_entries) { + if (drop_entries != NULL) { + for (size_t i = 0; i < drop_entries->num_entries; ++i) { + gpr_free(drop_entries->token_counts[i].token); + } + gpr_free(drop_entries->token_counts); + gpr_free(drop_entries); + } } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h index 4bb47d5c5c..c51e2a431a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -25,6 +25,16 @@ typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats; +typedef struct { + char* token; + int64_t count; +} grpc_grpclb_drop_token_count; + +typedef struct { + grpc_grpclb_drop_token_count* token_counts; + size_t num_entries; +} grpc_grpclb_dropped_call_counts; + grpc_grpclb_client_stats* grpc_grpclb_client_stats_create(); grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( grpc_grpclb_client_stats* client_stats); @@ -33,18 +43,23 @@ void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats); void grpc_grpclb_client_stats_add_call_started( grpc_grpclb_client_stats* client_stats); void grpc_grpclb_client_stats_add_call_finished( - bool finished_with_drop_for_rate_limiting, - bool finished_with_drop_for_load_balancing, bool finished_with_client_failed_to_send, bool finished_known_received, grpc_grpclb_client_stats* client_stats); -void grpc_grpclb_client_stats_get( +// This method is not thread-safe; caller must synchronize. +void grpc_grpclb_client_stats_add_call_dropped_locked( + char* token, grpc_grpclb_client_stats* client_stats); + +// This method is not thread-safe; caller must synchronize. +void grpc_grpclb_client_stats_get_locked( grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, int64_t* num_calls_finished, - int64_t* num_calls_finished_with_drop_for_rate_limiting, - int64_t* num_calls_finished_with_drop_for_load_balancing, int64_t* num_calls_finished_with_client_failed_to_send, - int64_t* num_calls_finished_known_received); + int64_t* num_calls_finished_known_received, + grpc_grpclb_dropped_call_counts** drop_token_counts); + +void grpc_grpclb_dropped_call_counts_destroy( + grpc_grpclb_dropped_call_counts* drop_entries); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \ */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index bec7c97a78..6fa29f326e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -76,7 +76,33 @@ static void populate_timestamp(gpr_timespec timestamp, timestamp_pb->nanos = timestamp.tv_nsec; } -grpc_grpclb_request *grpc_grpclb_load_report_request_create( +static bool encode_string(pb_ostream_t *stream, const pb_field_t *field, + void *const *arg) { + char *str = *arg; + if (!pb_encode_tag_for_field(stream, field)) return false; + return pb_encode_string(stream, (uint8_t *)str, strlen(str)); +} + +static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field, + void *const *arg) { + grpc_grpclb_dropped_call_counts *drop_entries = *arg; + if (drop_entries == NULL) return true; + for (size_t i = 0; i < drop_entries->num_entries; ++i) { + if (!pb_encode_tag_for_field(stream, field)) return false; + grpc_lb_v1_ClientStatsPerToken drop_message; + drop_message.load_balance_token.funcs.encode = encode_string; + drop_message.load_balance_token.arg = drop_entries->token_counts[i].token; + drop_message.has_num_calls = true; + drop_message.num_calls = drop_entries->token_counts[i].count; + if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields, + &drop_message)) { + return false; + } + } + return true; +} + +grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked( grpc_grpclb_client_stats *client_stats) { grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request)); req->has_client_stats = true; @@ -84,18 +110,17 @@ grpc_grpclb_request *grpc_grpclb_load_report_request_create( populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); req->client_stats.has_num_calls_started = true; req->client_stats.has_num_calls_finished = true; - req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true; - req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true; req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; req->client_stats.has_num_calls_finished_known_received = true; - grpc_grpclb_client_stats_get( + req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops; + grpc_grpclb_client_stats_get_locked( client_stats, &req->client_stats.num_calls_started, &req->client_stats.num_calls_finished, - &req->client_stats.num_calls_finished_with_drop_for_rate_limiting, - &req->client_stats.num_calls_finished_with_drop_for_load_balancing, &req->client_stats.num_calls_finished_with_client_failed_to_send, - &req->client_stats.num_calls_finished_known_received); + &req->client_stats.num_calls_finished_known_received, + (grpc_grpclb_dropped_call_counts **)&req->client_stats + .calls_finished_with_drop.arg); return req; } @@ -117,6 +142,11 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { } void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { + if (request->has_client_stats) { + grpc_grpclb_dropped_call_counts *drop_entries = + request->client_stats.calls_finished_with_drop.arg; + grpc_grpclb_dropped_call_counts_destroy(drop_entries); + } gpr_free(request); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index ef8d563edc..c4a98492c9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -44,7 +44,7 @@ typedef struct { /** Create a request for a gRPC LB service under \a lb_service_name */ grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); -grpc_grpclb_request *grpc_grpclb_load_report_request_create( +grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked( grpc_grpclb_client_stats *client_stats); /** Protocol Buffers v3-encode \a request */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c index fb119c7fc8..6a5d54c82a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c @@ -33,14 +33,19 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = { PB_LAST_FIELD }; -const pb_field_t grpc_lb_v1_ClientStats_fields[8] = { +const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = { + PB_FIELD( 1, STRING , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0), + PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v1_ClientStats_fields[7] = { PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields), PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0), PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0), - PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0), - PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0), - PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0), + PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0), PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0), + PB_FIELD( 8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields), PB_LAST_FIELD }; @@ -62,12 +67,11 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = { PB_LAST_FIELD }; -const pb_field_t grpc_lb_v1_Server_fields[6] = { +const pb_field_t grpc_lb_v1_Server_fields[5] = { PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0), PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0), PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0), - PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0), - PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0), + PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0), PB_LAST_FIELD }; @@ -81,7 +85,7 @@ const pb_field_t grpc_lb_v1_Server_fields[6] = { * numbers or field sizes that are larger than what can fit in 8 or 16 bit * field descriptors. */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) #endif #if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT) @@ -92,7 +96,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) * numbers or field sizes that are larger than what can fit in the default * 8 bit descriptors. */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) #endif diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h index d3ae919ec2..93333d1aed 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h @@ -14,6 +14,13 @@ extern "C" { #endif /* Struct definitions */ +typedef struct _grpc_lb_v1_ClientStatsPerToken { + pb_callback_t load_balance_token; + bool has_num_calls; + int64_t num_calls; +/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */ +} grpc_lb_v1_ClientStatsPerToken; + typedef struct _grpc_lb_v1_Duration { bool has_seconds; int64_t seconds; @@ -36,10 +43,8 @@ typedef struct _grpc_lb_v1_Server { int32_t port; bool has_load_balance_token; char load_balance_token[50]; - bool has_drop_for_rate_limiting; - bool drop_for_rate_limiting; - bool has_drop_for_load_balancing; - bool drop_for_load_balancing; + bool has_drop; + bool drop; /* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */ } grpc_lb_v1_Server; @@ -58,14 +63,11 @@ typedef struct _grpc_lb_v1_ClientStats { int64_t num_calls_started; bool has_num_calls_finished; int64_t num_calls_finished; - bool has_num_calls_finished_with_drop_for_rate_limiting; - int64_t num_calls_finished_with_drop_for_rate_limiting; - bool has_num_calls_finished_with_drop_for_load_balancing; - int64_t num_calls_finished_with_drop_for_load_balancing; bool has_num_calls_finished_with_client_failed_to_send; int64_t num_calls_finished_with_client_failed_to_send; bool has_num_calls_finished_known_received; int64_t num_calls_finished_known_received; + pb_callback_t calls_finished_with_drop; /* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */ } grpc_lb_v1_ClientStats; @@ -107,39 +109,41 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0} #define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default} #define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""} -#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0} +#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0} +#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}} #define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default} #define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default} #define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default} -#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0} +#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0} #define grpc_lb_v1_Duration_init_zero {false, 0, false, 0} #define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0} #define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero} #define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""} -#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0} +#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0} +#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}} #define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero} #define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero} #define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero} -#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0} +#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0} /* Field tags (for use in manual encoding/decoding) */ +#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1 +#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2 #define grpc_lb_v1_Duration_seconds_tag 1 #define grpc_lb_v1_Duration_nanos_tag 2 #define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1 #define grpc_lb_v1_Server_ip_address_tag 1 #define grpc_lb_v1_Server_port_tag 2 #define grpc_lb_v1_Server_load_balance_token_tag 3 -#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4 -#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5 +#define grpc_lb_v1_Server_drop_tag 4 #define grpc_lb_v1_Timestamp_seconds_tag 1 #define grpc_lb_v1_Timestamp_nanos_tag 2 #define grpc_lb_v1_ClientStats_timestamp_tag 1 #define grpc_lb_v1_ClientStats_num_calls_started_tag 2 #define grpc_lb_v1_ClientStats_num_calls_finished_tag 3 -#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4 -#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5 #define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6 #define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7 +#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8 #define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1 #define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2 #define grpc_lb_v1_ServerList_servers_tag 1 @@ -154,22 +158,24 @@ extern const pb_field_t grpc_lb_v1_Duration_fields[3]; extern const pb_field_t grpc_lb_v1_Timestamp_fields[3]; extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3]; extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2]; -extern const pb_field_t grpc_lb_v1_ClientStats_fields[8]; +extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3]; +extern const pb_field_t grpc_lb_v1_ClientStats_fields[7]; extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3]; extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3]; extern const pb_field_t grpc_lb_v1_ServerList_fields[3]; -extern const pb_field_t grpc_lb_v1_Server_fields[6]; +extern const pb_field_t grpc_lb_v1_Server_fields[5]; /* Maximum encoded size of messages (where known) */ #define grpc_lb_v1_Duration_size 22 #define grpc_lb_v1_Timestamp_size 22 -#define grpc_lb_v1_LoadBalanceRequest_size 226 +#define grpc_lb_v1_LoadBalanceRequest_size (140 + grpc_lb_v1_ClientStats_size) #define grpc_lb_v1_InitialLoadBalanceRequest_size 131 -#define grpc_lb_v1_ClientStats_size 90 +/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */ +/* grpc_lb_v1_ClientStats_size depends on runtime parameters */ #define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size) #define grpc_lb_v1_InitialLoadBalanceResponse_size 90 /* grpc_lb_v1_ServerList_size depends on runtime parameters */ -#define grpc_lb_v1_Server_size 85 +#define grpc_lb_v1_Server_size 83 /* Message IDs (where set with "msgid" option) */ #ifdef PB_MSGID diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index bc40165cfb..a7f7e9542c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -74,6 +74,9 @@ typedef struct round_robin_lb_policy { bool started_picking; /** are we shutting down? */ bool shutdown; + /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be + * service after this point, the policy will never transition out. */ + bool in_connectivity_shutdown; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; @@ -420,6 +423,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(!p->in_connectivity_shutdown); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol); } @@ -532,6 +537,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); + p->in_connectivity_shutdown = true; new_state = GRPC_CHANNEL_SHUTDOWN; } else if (subchannel_list->num_transient_failures == p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 9065e33613..6ec3790a5f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -33,13 +33,13 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> -#include <nameser.h> #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/nameser.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c index fe7b7e4a42..5a99cbeffc 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c @@ -118,6 +118,7 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx) { switch (input_tail) { case 3: ctx->output_cur[1] = COMPOSE_OUTPUT_BYTE_1(ctx->input_cur); + /* fallthrough */ case 2: ctx->output_cur[0] = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index ede05d57b7..221224111e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -114,11 +114,6 @@ static void connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state state, grpc_error *error, const char *reason); -static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - size_t max_size_hint, - size_t have_already); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); @@ -270,8 +265,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->endpoint_reading = 1; t->next_stream_id = is_client ? 1 : 2; t->is_client = is_client; - t->outgoing_window = DEFAULT_WINDOW; - t->incoming_window = DEFAULT_WINDOW; + t->flow_control.remote_window = DEFAULT_WINDOW; + t->flow_control.announced_window = DEFAULT_WINDOW; + t->flow_control.t = t; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -710,6 +706,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, post_destructive_reclaimer(exec_ctx, t); } + s->flow_control.s = s; GPR_TIMER_END("init_stream", 0); return 0; @@ -766,13 +763,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->byte_stream_error); - if (s->incoming_window_delta > 0) { - GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( - "destroy", t, s, s->incoming_window_delta); - } else if (s->incoming_window_delta < 0) { - GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( - "destroy", t, s, -s->incoming_window_delta); - } + grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -1485,9 +1476,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message = op_payload->recv_message.recv_message; if (s->id != 0) { - already_received = s->frame_storage.length; - incoming_byte_stream_update_flow_control( - exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received); + if (!s->read_closed) { + already_received = s->frame_storage.length; + grpc_chttp2_flowctl_incoming_bs_update( + &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES, + already_received); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, + grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), + t, s); + } } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -2237,6 +2235,37 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, * INPUT PROCESSING - PARSING */ +void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, + grpc_chttp2_flowctl_action action, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + switch (action.send_stream_update) { + case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: + break; + case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + "immediate stream flowctl"); + break; + case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK, + "queue stream flowctl"); + break; + } + switch (action.send_transport_update) { + case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: + break; + case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: + grpc_chttp2_initiate_write(exec_ctx, t, "immediate transport flowctl"); + break; + // this is the same as no action b/c every time the transport enters the + // writing path it will maybe do an update + case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: + break; + } +} + static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, double bdp_dbl) { // initial window size bounded [1,2^31-1], but we set the min to 128. @@ -2248,9 +2277,10 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) { return; } - if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string, - (int)bdp); + if (GRPC_TRACER_ON(grpc_bdp_estimator_trace) || + GRPC_TRACER_ON(grpc_flowctl_trace)) { + gpr_log(GPR_DEBUG, "%s | %p[%s] | update initial window size to %d", + t->peer_string, t, t->is_client ? "cli" : "svr", (int)bdp); } queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, (uint32_t)bdp); @@ -2350,8 +2380,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action.parse", 0); GPR_TIMER_BEGIN("post_parse_locked", 0); - if (t->initial_window_update != 0) { - if (t->initial_window_update > 0) { + if (t->flow_control.initial_window_update != 0) { + if (t->flow_control.initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { grpc_chttp2_become_writable( @@ -2359,7 +2389,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, "unstalled"); } } - t->initial_window_update = 0; + t->flow_control.initial_window_update = 0; } GPR_TIMER_END("post_parse_locked", 0); } @@ -2633,54 +2663,6 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, } } -static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - size_t max_size_hint, - size_t have_already) { - uint32_t max_recv_bytes; - uint32_t initial_window_size = - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - - /* clamp max recv hint to an allowable size */ - if (max_size_hint >= UINT32_MAX - initial_window_size) { - max_recv_bytes = UINT32_MAX - initial_window_size; - } else { - max_recv_bytes = (uint32_t)max_size_hint; - } - - /* account for bytes already received but unknown to higher layers */ - if (max_recv_bytes >= have_already) { - max_recv_bytes -= (uint32_t)have_already; - } else { - max_recv_bytes = 0; - } - - /* add some small lookahead to keep pipelines flowing */ - GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size); - if (s->incoming_window_delta < max_recv_bytes && !s->read_closed) { - uint32_t add_max_recv_bytes = - (uint32_t)(max_recv_bytes - s->incoming_window_delta); - grpc_chttp2_stream_write_type write_type = - GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED; - if (s->incoming_window_delta + initial_window_size < - (int64_t)have_already) { - write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; - } - GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s, - add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, - add_max_recv_bytes); - if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size - - (int64_t)s->announce_window > - (int64_t)initial_window_size / 2) { - write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; - } - grpc_chttp2_become_writable(exec_ctx, t, s, write_type, - "read_incoming_stream"); - } -} - static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, void *argp, grpc_error *error_ignored) { @@ -2689,9 +2671,15 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s = bs->stream; size_t cur_length = s->frame_storage.length; - incoming_byte_stream_update_flow_control( - exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); - + if (!s->read_closed) { + grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control, + bs->next_action.max_size_hint, + cur_length); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, + grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t, + s); + } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { grpc_slice_buffer_swap(&s->frame_storage, @@ -3001,83 +2989,6 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, } /******************************************************************************* - * TRACING - */ - -static char *format_flowctl_context_var(const char *context, const char *var, - int64_t val, uint32_t id) { - char *name; - if (context == NULL) { - name = gpr_strdup(var); - } else if (0 == strcmp(context, "t")) { - GPR_ASSERT(id == 0); - gpr_asprintf(&name, "TRANSPORT:%s", var); - } else if (0 == strcmp(context, "s")) { - GPR_ASSERT(id != 0); - gpr_asprintf(&name, "STREAM[%d]:%s", id, var); - } else { - gpr_asprintf(&name, "BAD_CONTEXT[%s][%d]:%s", context, id, var); - } - char *name_fld = gpr_leftpad(name, ' ', 64); - char *value; - gpr_asprintf(&value, "%" PRId64, val); - char *value_fld = gpr_leftpad(value, ' ', 8); - char *result; - gpr_asprintf(&result, "%s %s", name_fld, value_fld); - gpr_free(name); - gpr_free(name_fld); - gpr_free(value); - gpr_free(value_fld); - return result; -} - -void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, - grpc_chttp2_flowctl_op op, const char *context1, - const char *var1, const char *context2, - const char *var2, int is_client, - uint32_t stream_id, int64_t val1, int64_t val2) { - char *tmp_phase; - char *label1 = format_flowctl_context_var(context1, var1, val1, stream_id); - char *label2 = format_flowctl_context_var(context2, var2, val2, stream_id); - char *clisvr = is_client ? "client" : "server"; - char *prefix; - - tmp_phase = gpr_leftpad(phase, ' ', 8); - gpr_asprintf(&prefix, "FLOW %s: %s ", tmp_phase, clisvr); - gpr_free(tmp_phase); - - switch (op) { - case GRPC_CHTTP2_FLOWCTL_MOVE: - if (val2 != 0) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sMOVE %s <- %s giving %" PRId64, prefix, label1, label2, - val1 + val2); - } - break; - case GRPC_CHTTP2_FLOWCTL_CREDIT: - GPR_ASSERT(val2 >= 0); - if (val2 != 0) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sCREDIT %s by %s giving %" PRId64, prefix, label1, label2, - val1 + val2); - } - break; - case GRPC_CHTTP2_FLOWCTL_DEBIT: - GPR_ASSERT(val2 >= 0); - if (val2 != 0) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sDEBIT %s by %s giving %" PRId64, prefix, label1, label2, - val1 - val2); - } - break; - } - - gpr_free(label1); - gpr_free(label2); - gpr_free(prefix); -} - -/******************************************************************************* * INTEGRATION GLUE */ diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c new file mode 100644 index 0000000000..c9f7eabd43 --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/flow_control.c @@ -0,0 +1,369 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/ext/transport/chttp2/transport/internal.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/support/string.h" + +static uint32_t grpc_chttp2_target_announced_window( + const grpc_chttp2_transport_flowctl* tfc); + +#ifndef NDEBUG + +typedef struct { + int64_t remote_window; + int64_t target_window; + int64_t announced_window; + int64_t remote_window_delta; + int64_t local_window_delta; + int64_t announced_window_delta; +} shadow_flow_control; + +static void pretrace(shadow_flow_control* shadow_fc, + grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc) { + shadow_fc->remote_window = tfc->remote_window; + shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc); + shadow_fc->announced_window = tfc->announced_window; + if (sfc != NULL) { + shadow_fc->remote_window_delta = sfc->remote_window_delta; + shadow_fc->local_window_delta = sfc->local_window_delta; + shadow_fc->announced_window_delta = sfc->announced_window_delta; + } +} + +static char* fmt_str(int64_t old, int64_t new) { + char* str; + if (old != new) { + gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new); + } else { + gpr_asprintf(&str, "%" PRId64 "", old); + } + char* str_lp = gpr_leftpad(str, ' ', 30); + gpr_free(str); + return str_lp; +} + +static void posttrace(shadow_flow_control* shadow_fc, + grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, char* reason) { + uint32_t acked_local_window = + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + uint32_t remote_window = + tfc->t->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window); + char* tlw_str = fmt_str(shadow_fc->target_window, + grpc_chttp2_target_announced_window(tfc)); + char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window); + char* srw_str; + char* slw_str; + char* saw_str; + if (sfc != NULL) { + srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window, + sfc->remote_window_delta + remote_window); + slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window, + sfc->local_window_delta + acked_local_window); + saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window, + sfc->announced_window_delta + acked_local_window); + } else { + srw_str = gpr_leftpad("", ' ', 30); + slw_str = gpr_leftpad("", ' ', 30); + saw_str = gpr_leftpad("", ' ', 30); + } + gpr_log(GPR_DEBUG, + "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", + tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr", + reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str); + gpr_free(trw_str); + gpr_free(tlw_str); + gpr_free(taw_str); + gpr_free(srw_str); + gpr_free(slw_str); + gpr_free(saw_str); +} + +static char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { + switch (urgency) { + case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: + return "no action"; + case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: + return "update immediately"; + case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: + return "queue update"; + default: + GPR_UNREACHABLE_CODE(return "unknown"); + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + +static void trace_action(grpc_chttp2_flowctl_action action) { + gpr_log(GPR_DEBUG, "transport: %s, stream: %s", + urgency_to_string(action.send_transport_update), + urgency_to_string(action.send_stream_update)); +} + +#define PRETRACE(tfc, sfc) \ + shadow_flow_control shadow_fc; \ + GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc)) +#define POSTTRACE(tfc, sfc, reason) \ + GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason)) +#define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action)) +#else +#define PRETRACE(tfc, sfc) +#define POSTTRACE(tfc, sfc, reason) +#define TRACEACTION(action) +#endif + +/* How many bytes of incoming flow control would we like to advertise */ +static uint32_t grpc_chttp2_target_announced_window( + const grpc_chttp2_transport_flowctl* tfc) { + return (uint32_t)GPR_MIN( + (int64_t)((1u << 31) - 1), + tfc->announced_stream_total_over_incoming_window + + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); +} + +// we have sent data on the wire, we must track this in our bookkeeping for the +// remote peer's flow control. +void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, + int64_t size) { + PRETRACE(tfc, sfc); + tfc->remote_window -= size; + sfc->remote_window_delta -= size; + POSTTRACE(tfc, sfc, " data sent"); +} + +static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc) { + if (sfc->announced_window_delta > 0) { + tfc->announced_stream_total_over_incoming_window -= + sfc->announced_window_delta; + } else { + tfc->announced_stream_total_under_incoming_window += + -sfc->announced_window_delta; + } +} + +static void announced_window_delta_postupdate( + grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { + if (sfc->announced_window_delta > 0) { + tfc->announced_stream_total_over_incoming_window += + sfc->announced_window_delta; + } else { + tfc->announced_stream_total_under_incoming_window -= + -sfc->announced_window_delta; + } +} + +// We have received data from the wire. We must track this in our own flow +// control bookkeeping. +// Returns an error if the incoming frame violates our flow control. +grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, + int64_t incoming_frame_size) { + uint32_t sent_init_window = + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + uint32_t acked_init_window = + tfc->t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + PRETRACE(tfc, sfc); + if (incoming_frame_size > tfc->announced_window) { + char* msg; + gpr_asprintf(&msg, + "frame of size %" PRId64 " overflows local window of %" PRId64, + incoming_frame_size, tfc->announced_window); + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + return err; + } + + if (sfc != NULL) { + int64_t acked_stream_window = + sfc->announced_window_delta + acked_init_window; + int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window; + if (incoming_frame_size > acked_stream_window) { + if (incoming_frame_size <= sent_stream_window) { + gpr_log( + GPR_ERROR, + "Incoming frame of size %" PRId64 + " exceeds local window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to" + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + incoming_frame_size, acked_stream_window, sent_stream_window); + } else { + char* msg; + gpr_asprintf(&msg, "frame of size %" PRId64 + " overflows local window of %" PRId64, + incoming_frame_size, acked_stream_window); + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + return err; + } + } + + announced_window_delta_preupdate(tfc, sfc); + sfc->announced_window_delta -= incoming_frame_size; + announced_window_delta_postupdate(tfc, sfc); + sfc->local_window_delta -= incoming_frame_size; + } + + tfc->announced_window -= incoming_frame_size; + + POSTTRACE(tfc, sfc, " data recv"); + return GRPC_ERROR_NONE; +} + +// Returns a non zero announce integer if we should send a transport window +// update +uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( + grpc_chttp2_transport_flowctl* tfc) { + PRETRACE(tfc, NULL); + uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); + uint32_t threshold_to_send_transport_window_update = + tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4 + : target_announced_window / 2; + if (tfc->announced_window <= threshold_to_send_transport_window_update && + tfc->announced_window != target_announced_window) { + uint32_t announce = (uint32_t)GPR_CLAMP( + target_announced_window - tfc->announced_window, 0, UINT32_MAX); + tfc->announced_window += announce; + POSTTRACE(tfc, NULL, "t updt sent"); + return announce; + } + GRPC_FLOW_CONTROL_IF_TRACING( + gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc, + tfc->t->is_client ? "cli" : "svr")); + return 0; +} + +// Returns a non zero announce integer if we should send a stream window update +uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( + grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { + PRETRACE(tfc, sfc); + if (sfc->local_window_delta > sfc->announced_window_delta) { + uint32_t announce = (uint32_t)GPR_CLAMP( + sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX); + announced_window_delta_preupdate(tfc, sfc); + sfc->announced_window_delta += announce; + announced_window_delta_postupdate(tfc, sfc); + POSTTRACE(tfc, sfc, "s updt sent"); + return announce; + } + GRPC_FLOW_CONTROL_IF_TRACING( + gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc, + sfc->s->id, tfc->t->is_client ? "cli" : "svr")); + return 0; +} + +// we have received a WINDOW_UPDATE frame for a transport +void grpc_chttp2_flowctl_recv_transport_update( + grpc_chttp2_transport_flowctl* tfc, uint32_t size) { + PRETRACE(tfc, NULL); + tfc->remote_window += size; + POSTTRACE(tfc, NULL, "t updt recv"); +} + +// we have received a WINDOW_UPDATE frame for a stream +void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, + uint32_t size) { + PRETRACE(tfc, sfc); + sfc->remote_window_delta += size; + POSTTRACE(tfc, sfc, "s updt recv"); +} + +void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, + size_t max_size_hint, + size_t have_already) { + PRETRACE(tfc, sfc); + uint32_t max_recv_bytes; + uint32_t sent_init_window = + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + + /* clamp max recv hint to an allowable size */ + if (max_size_hint >= UINT32_MAX - sent_init_window) { + max_recv_bytes = UINT32_MAX - sent_init_window; + } else { + max_recv_bytes = (uint32_t)max_size_hint; + } + + /* account for bytes already received but unknown to higher layers */ + if (max_recv_bytes >= have_already) { + max_recv_bytes -= (uint32_t)have_already; + } else { + max_recv_bytes = 0; + } + + /* add some small lookahead to keep pipelines flowing */ + GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window); + if (sfc->local_window_delta < max_recv_bytes) { + uint32_t add_max_recv_bytes = + (uint32_t)(max_recv_bytes - sfc->local_window_delta); + sfc->local_window_delta += add_max_recv_bytes; + } + POSTTRACE(tfc, sfc, "app st recv"); +} + +void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc) { + announced_window_delta_preupdate(tfc, sfc); +} + +grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( + const grpc_chttp2_transport_flowctl* tfc, + const grpc_chttp2_stream_flowctl* sfc) { + grpc_chttp2_flowctl_action action; + memset(&action, 0, sizeof(action)); + uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); + if (tfc->announced_window < target_announced_window / 2) { + action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; + } + if (sfc != NULL && !sfc->s->read_closed) { + uint32_t sent_init_window = + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + if ((int64_t)sfc->local_window_delta > + (int64_t)sfc->announced_window_delta && + (int64_t)sfc->announced_window_delta + sent_init_window <= + sent_init_window / 2) { + action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; + } else if (sfc->local_window_delta > sfc->announced_window_delta) { + action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; + } + } + TRACEACTION(action); + return action; +} diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index e3e432a94a..057d3d9ed3 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -201,11 +201,13 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, } if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && parser->incoming_settings[id] != parser->value) { - t->initial_window_update += + t->flow_control.initial_window_update += (int64_t)parser->value - parser->incoming_settings[id]; - if (GRPC_TRACER_ON(grpc_http_trace)) { - gpr_log(GPR_DEBUG, "adding %d for initial_window change", - (int)t->initial_window_update); + if (GRPC_TRACER_ON(grpc_http_trace) || + GRPC_TRACER_ON(grpc_flowctl_trace)) { + gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change", + t, t->is_client ? "cli" : "svr", + (int)t->flow_control.initial_window_update); } } parser->incoming_settings[id] = parser->value; diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 682be2c89b..65f3b01d77 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -95,8 +95,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse( if (t->incoming_stream_id != 0) { if (s != NULL) { - GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta, - received_update); + grpc_chttp2_flowctl_recv_stream_update( + &t->flow_control, &s->flow_control, received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { grpc_chttp2_become_writable( exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, @@ -104,10 +104,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse( } } } else { - bool was_zero = t->outgoing_window <= 0; - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", t, outgoing_window, - received_update); - bool is_zero = t->outgoing_window <= 0; + bool was_zero = t->flow_control.remote_window <= 0; + grpc_chttp2_flowctl_recv_transport_update(&t->flow_control, + received_update); + bool is_zero = t->flow_control.remote_window <= 0; if (was_zero && !is_zero) { grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control"); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b538d1df17..f26f14dbec 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -213,6 +213,35 @@ typedef enum { GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, } grpc_chttp2_keepalive_state; +typedef struct { + /** initial window change. This is tracked as we parse settings frames from + * the remote peer. If there is a positive delta, then we will make all + * streams readable since they may have become unstalled */ + int64_t initial_window_update; + + /** Our bookkeeping for the remote peer's available window */ + int64_t remote_window; + + /** calculating what we should give for local window: + we track the total amount of flow control over initial window size + across all streams: this is data that we want to receive right now (it + has an outstanding read) + and the total amount of flow control under initial window size across all + streams: this is data we've read early + we want to adjust incoming_window such that: + incoming_window = total_over - max(bdp - total_under, 0) */ + int64_t announced_stream_total_over_incoming_window; + int64_t announced_stream_total_under_incoming_window; + + /** This is out window according to what we have sent to our remote peer. The + * difference between this and target window is what we use to decide when + * to send WINDOW_UPDATE frames. */ + int64_t announced_window; + + // read only pointer back to transport for certain data + const grpc_chttp2_transport *t; +} grpc_chttp2_transport_flowctl; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -271,7 +300,6 @@ struct grpc_chttp2_transport { grpc_slice_buffer outbuf; /** hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; - int64_t outgoing_window; /** is this a client? */ uint8_t is_client; @@ -328,21 +356,14 @@ struct grpc_chttp2_transport { /** parser for goaway frames */ grpc_chttp2_goaway_parser goaway_parser; - /** initial window change */ - int64_t initial_window_update; + grpc_chttp2_transport_flowctl flow_control; - /** window available for peer to send to us */ - int64_t incoming_window; - /** calculating what we should give for incoming window: - we track the total amount of flow control over initial window size - across all streams: this is data that we want to receive right now (it - has an outstanding read) - and the total amount of flow control under initial window size across all - streams: this is data we've read early - we want to adjust incoming_window such that: - incoming_window = total_over - max(bdp - total_under, 0) */ - int64_t stream_total_over_incoming_window; - int64_t stream_total_under_incoming_window; + /* bdp estimation */ + grpc_bdp_estimator bdp_estimator; + + /* pid controller */ + grpc_pid_controller pid_controller; + gpr_timespec last_pid_update; /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; @@ -369,11 +390,8 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb *write_cb_pool; /* bdp estimator */ - grpc_bdp_estimator bdp_estimator; - grpc_pid_controller pid_controller; grpc_closure start_bdp_ping_locked; grpc_closure finish_bdp_ping_locked; - gpr_timespec last_pid_update; /* if non-NULL, close the transport with this error when writes are finished */ @@ -422,6 +440,25 @@ typedef enum { GPRC_METADATA_PUBLISHED_AT_CLOSE } grpc_published_metadata_method; +typedef struct { + /** window available for us to send to peer, over or under the initial window + * size of the transport... ie: + * remote_window = remote_window_delta + transport.initial_window_size */ + int64_t remote_window_delta; + + /** window available for peer to send to us (as a delta on + * transport.initial_window_size) + * local_window = local_window_delta + transport.initial_window_size */ + int64_t local_window_delta; + + /** window available for peer to send to us over this stream that we have + * announced to the peer */ + int64_t announced_window_delta; + + // read only pointer back to stream for data + const grpc_chttp2_stream *s; +} grpc_chttp2_stream_flowctl; + struct grpc_chttp2_stream { grpc_chttp2_transport *t; grpc_stream_refcount *refcount; @@ -435,10 +472,6 @@ struct grpc_chttp2_stream { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ uint32_t id; - /** window available for us to send to peer, over or under the initial window - * size of the transport... ie: - * outgoing_window = outgoing_window_delta + transport.initial_window_size */ - int64_t outgoing_window_delta; /** things the upper layers would like to send */ grpc_metadata_batch *send_initial_metadata; grpc_closure *send_initial_metadata_finished; @@ -505,10 +538,6 @@ struct grpc_chttp2_stream { grpc_error *forced_close_error; /** how many header frames have we received? */ uint8_t header_frames_received; - /** window available for peer to send to us (as a delta on - * transport.initial_window_size) - * incoming_window = incoming_window_delta + transport.initial_window_size */ - int64_t incoming_window_delta; /** parsing state for data frames */ /* Accessed only by transport thread when stream->pending_byte_stream == false * Accessed only by application thread when stream->pending_byte_stream == @@ -519,8 +548,9 @@ struct grpc_chttp2_stream { bool sent_initial_metadata; bool sent_trailing_metadata; - /** how much window should we announce? */ - uint32_t announce_window; + + grpc_chttp2_stream_flowctl flow_control; + grpc_slice_buffer flow_controlled_buffer; grpc_chttp2_write_cb *on_write_finished_cbs; @@ -621,6 +651,75 @@ bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t, bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); +/********* Flow Control ***************/ + +// we have sent data on the wire +void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc, + int64_t size); + +// we have received data from the wire +grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc, + int64_t incoming_frame_size); + +// returns an announce if we should send a transport update to our peer, +// else returns zero +uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( + grpc_chttp2_transport_flowctl *tfc); + +// returns an announce if we should send a stream update to our peer, else +// returns zero +uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( + grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc); + +// we have received a WINDOW_UPDATE frame for a transport +void grpc_chttp2_flowctl_recv_transport_update( + grpc_chttp2_transport_flowctl *tfc, uint32_t size); + +// we have received a WINDOW_UPDATE frame for a stream +void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc, + uint32_t size); + +// the application is asking for a certain amount of bytes +void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc, + size_t max_size_hint, + size_t have_already); + +void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc); + +typedef enum { + // Nothing to be done. + GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0, + // Initiate a write to update the initial window immediately. + GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY, + // Push the flow control update into a send buffer, to be sent + // out the next time a write is initiated. + GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE, +} grpc_chttp2_flowctl_urgency; + +typedef struct { + grpc_chttp2_flowctl_urgency send_stream_update; + grpc_chttp2_flowctl_urgency send_transport_update; +} grpc_chttp2_flowctl_action; + +// Reads the flow control data and returns and actionable struct that will tell +// chttp2 exactly what it needs to do +grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( + const grpc_chttp2_transport_flowctl *tfc, + const grpc_chttp2_stream_flowctl *sfc); + +// Takes in a flow control action and performs all the needed operations. +void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, + grpc_chttp2_flowctl_action action, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); + +/********* End of Flow Control ***************/ + grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t, uint32_t id); grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, @@ -651,126 +750,22 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, extern grpc_tracer_flag grpc_http_trace; extern grpc_tracer_flag grpc_flowctl_trace; +#ifndef NDEBUG +#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) \ + if (!(GRPC_TRACER_ON(grpc_flowctl_trace))) \ + ; \ + else \ + stmt +#else +#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) +#endif + #define GRPC_CHTTP2_IF_TRACING(stmt) \ if (!(GRPC_TRACER_ON(grpc_http_trace))) \ ; \ else \ stmt -typedef enum { - GRPC_CHTTP2_FLOWCTL_MOVE, - GRPC_CHTTP2_FLOWCTL_CREDIT, - GRPC_CHTTP2_FLOWCTL_DEBIT -} grpc_chttp2_flowctl_op; - -#define GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, id1, id2, dst_context, \ - dst_var, src_context, src_var) \ - do { \ - assert(id1 == id2); \ - if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \ - grpc_chttp2_flowctl_trace( \ - __FILE__, __LINE__, phase, GRPC_CHTTP2_FLOWCTL_MOVE, #dst_context, \ - #dst_var, #src_context, #src_var, transport->is_client, id1, \ - dst_context->dst_var, src_context->src_var); \ - } \ - dst_context->dst_var += src_context->src_var; \ - src_context->src_var = 0; \ - } while (0) - -#define GRPC_CHTTP2_FLOW_MOVE_STREAM(phase, transport, dst_context, dst_var, \ - src_context, src_var) \ - GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, dst_context->id, \ - src_context->id, dst_context, dst_var, \ - src_context, src_var) -#define GRPC_CHTTP2_FLOW_MOVE_TRANSPORT(phase, dst_context, dst_var, \ - src_context, src_var) \ - GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, dst_context, 0, 0, dst_context, dst_var, \ - src_context, src_var) - -#define GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, id, dst_context, \ - dst_var, amount) \ - do { \ - if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \ - grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \ - GRPC_CHTTP2_FLOWCTL_CREDIT, #dst_context, \ - #dst_var, NULL, #amount, transport->is_client, \ - id, dst_context->dst_var, amount); \ - } \ - dst_context->dst_var += amount; \ - } while (0) - -#define GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, dst_var, \ - amount) \ - GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, dst_context->id, \ - dst_context, dst_var, amount) -#define GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT(phase, dst_context, dst_var, amount) \ - GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ - amount) - -#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \ - phase, transport, dst_context) \ - if (dst_context->incoming_window_delta < 0) { \ - transport->stream_total_under_incoming_window += \ - dst_context->incoming_window_delta; \ - } else if (dst_context->incoming_window_delta > 0) { \ - transport->stream_total_over_incoming_window -= \ - dst_context->incoming_window_delta; \ - } - -#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \ - phase, transport, dst_context) \ - if (dst_context->incoming_window_delta < 0) { \ - transport->stream_total_under_incoming_window -= \ - dst_context->incoming_window_delta; \ - } else if (dst_context->incoming_window_delta > 0) { \ - transport->stream_total_over_incoming_window += \ - dst_context->incoming_window_delta; \ - } - -#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \ - phase, transport, dst_context, amount) \ - GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \ - dst_context); \ - GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \ - incoming_window_delta, amount); \ - GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \ - dst_context); - -#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \ - phase, transport, dst_context, amount) \ - GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \ - dst_context); \ - GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \ - incoming_window_delta, amount); \ - GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \ - dst_context); - -#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \ - dst_var, amount) \ - do { \ - if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \ - grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \ - GRPC_CHTTP2_FLOWCTL_DEBIT, #dst_context, \ - #dst_var, NULL, #amount, transport->is_client, \ - id, dst_context->dst_var, amount); \ - } \ - dst_context->dst_var -= amount; \ - } while (0) - -#define GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, dst_var, \ - amount) \ - GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, dst_context->id, \ - dst_context, dst_var, amount) -#define GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT(phase, dst_context, dst_var, amount) \ - GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ - amount) - -void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, - grpc_chttp2_flowctl_op op, const char *context1, - const char *var1, const char *context2, - const char *var2, int is_client, - uint32_t stream_id, int64_t val1, int64_t val2); - void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *stream, grpc_error *error); void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, @@ -872,8 +867,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); -uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t); - /** Set the default keepalive configurations, must only be called at initialization */ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 9d46cfa22e..18d163ee98 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -349,93 +349,25 @@ void grpc_chttp2_parsing_become_skip_parser(grpc_exec_ctx *exec_ctx, t->parser == grpc_chttp2_header_parser_parse); } -static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - uint32_t incoming_frame_size = t->incoming_frame_size; - if (incoming_frame_size > t->incoming_window) { - char *msg; - gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, t->incoming_window); - grpc_error *err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - gpr_free(msg); - return err; - } - - if (s != NULL) { - if (incoming_frame_size > - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { - if (incoming_frame_size <= - s->incoming_window_delta + - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { - gpr_log( - GPR_ERROR, - "Incoming frame of size %d exceeds incoming window size of %" PRId64 - ".\n" - "The (un-acked, future) window size would be %" PRId64 - " which is not exceeded.\n" - "This would usually cause a disconnection, but allowing it due to " - "broken HTTP2 implementations in the wild.\n" - "See (for example) https://github.com/netty/netty/issues/6520.", - t->incoming_frame_size, - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - s->incoming_window_delta + - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - } else { - char *msg; - gpr_asprintf(&msg, - "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - grpc_error *err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - gpr_free(msg); - return err; - } - } - - GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, - incoming_frame_size); - if ((int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= - -(int64_t)t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] / - 2) { - grpc_chttp2_become_writable(exec_ctx, t, s, - GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, - "window-update-required"); - } - s->received_bytes += incoming_frame_size; - } - - uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, - incoming_frame_size); - if (t->incoming_window <= target_incoming_window / 2) { - grpc_chttp2_initiate_write(exec_ctx, t, "flow_control"); - } - - return GRPC_ERROR_NONE; -} - static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); grpc_error *err = GRPC_ERROR_NONE; - err = update_incoming_window(exec_ctx, t, s); + err = grpc_chttp2_flowctl_recv_data(&t->flow_control, + s == NULL ? NULL : &s->flow_control, + t->incoming_frame_size); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, grpc_chttp2_flowctl_get_action( + &t->flow_control, s == NULL ? NULL : &s->flow_control), + t, s); if (err != GRPC_ERROR_NONE) { goto error_handler; } if (s == NULL) { return init_skip_frame_parser(exec_ctx, t, 0); } + s->received_bytes += t->incoming_frame_size; s->stats.incoming.framing_bytes += 9; if (err == GRPC_ERROR_NONE && s->read_closed) { return init_skip_frame_parser(exec_ctx, t, 0); diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 1bf5b34510..7cc85dea9c 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -150,12 +150,17 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t, void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + GRPC_FLOW_CONTROL_IF_TRACING( + gpr_log(GPR_DEBUG, "stream %u stalled by transport", s->id)); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream **s) { - return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + bool ret = stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + GRPC_FLOW_CONTROL_IF_TRACING(if (ret) gpr_log( + GPR_DEBUG, "stream %u un-stalled by transport", (*s)->id)); + return ret; } void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t, @@ -165,15 +170,23 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t, void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + GRPC_FLOW_CONTROL_IF_TRACING( + gpr_log(GPR_DEBUG, "stream %u stalled by stream", s->id)); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); } bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t, grpc_chttp2_stream **s) { - return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); + bool ret = stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); + GRPC_FLOW_CONTROL_IF_TRACING( + if (ret) gpr_log(GPR_DEBUG, "stream %u un-stalled by stream", (*s)->id)); + return ret; } bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); + bool ret = stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); + GRPC_FLOW_CONTROL_IF_TRACING( + if (ret) gpr_log(GPR_DEBUG, "stream %u un-stalled by stream", s->id)); + return ret; } diff --git a/src/core/ext/transport/chttp2/transport/varint.c b/src/core/ext/transport/chttp2/transport/varint.c index 5f93a23a94..0d94ddcbc3 100644 --- a/src/core/ext/transport/chttp2/transport/varint.c +++ b/src/core/ext/transport/chttp2/transport/varint.c @@ -37,12 +37,16 @@ void grpc_chttp2_hpack_write_varint_tail(uint32_t tail_value, uint8_t* target, switch (tail_length) { case 5: target[4] = (uint8_t)((tail_value >> 28) | 0x80); + /* fallthrough */ case 4: target[3] = (uint8_t)((tail_value >> 21) | 0x80); + /* fallthrough */ case 3: target[2] = (uint8_t)((tail_value >> 14) | 0x80); + /* fallthrough */ case 2: target[1] = (uint8_t)((tail_value >> 7) | 0x80); + /* fallthrough */ case 1: target[0] = (uint8_t)((tail_value) | 0x80); } diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index c3ede08343..80eb51ff0d 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -148,15 +148,6 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) { return true; } -/* How many bytes of incoming flow control would we like to advertise */ -uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) { - return (uint32_t)GPR_MIN( - (int64_t)((1u << 31) - 1), - t->stream_total_over_incoming_window + - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); -} - /* How many bytes would we like to put on the wire during a single syscall */ static uint32_t target_write_size(grpc_chttp2_transport *t) { return 1024 * 1024; @@ -201,7 +192,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &t->hpack_compressor, t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); - if (t->outgoing_window > 0) { + if (t->flow_control.remote_window > 0) { while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) && stream_ref_if_not_destroyed(&s->refcount->refs)) { @@ -227,10 +218,12 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( bool sent_initial_metadata = s->sent_initial_metadata; bool now_writing = false; - GRPC_CHTTP2_IF_TRACING(gpr_log( - GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t, - t->is_client ? "CLIENT" : "SERVER", s->id, sent_initial_metadata, - s->send_initial_metadata != NULL, s->announce_window)); + GRPC_CHTTP2_IF_TRACING( + gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t, + t->is_client ? "CLIENT" : "SERVER", s->id, + sent_initial_metadata, s->send_initial_metadata != NULL, + (int)(s->flow_control.local_window_delta - + s->flow_control.announced_window_delta))); grpc_mdelem *extra_headers_for_trailing_metadata[2]; size_t num_extra_headers_for_trailing_metadata = 0; @@ -287,11 +280,12 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( sent_initial_metadata = true; } /* send any window updates */ - if (s->announce_window > 0) { - uint32_t announce = s->announce_window; - grpc_slice_buffer_add(&t->outbuf, - grpc_chttp2_window_update_create( - s->id, s->announce_window, &s->stats.outgoing)); + uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update( + &t->flow_control, &s->flow_control); + if (stream_announce > 0) { + grpc_slice_buffer_add( + &t->outbuf, grpc_chttp2_window_update_create(s->id, stream_announce, + &s->stats.outgoing)); t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { @@ -299,22 +293,21 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( gpr_inf_past(GPR_CLOCK_MONOTONIC); t->ping_recv_state.ping_strikes = 0; } - GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce); } if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ if (s->flow_controlled_buffer.length > 0 || (s->stream_compression_send_enabled && s->compressed_data_buffer->length > 0)) { - uint32_t stream_outgoing_window = (uint32_t)GPR_MAX( + uint32_t stream_remote_window = (uint32_t)GPR_MAX( 0, - s->outgoing_window_delta + + s->flow_control.remote_window_delta + (int64_t)t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); uint32_t max_outgoing = (uint32_t)GPR_MIN( t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - GPR_MIN(stream_outgoing_window, t->outgoing_window)); + GPR_MIN(stream_remote_window, t->flow_control.remote_window)); if (max_outgoing > 0) { bool is_last_data_frame = false; bool is_last_frame = false; @@ -335,10 +328,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_chttp2_encode_data(s->id, s->compressed_data_buffer, send_bytes, is_last_frame, &s->stats.outgoing, &t->outbuf); - GRPC_CHTTP2_FLOW_DEBIT_STREAM( - "write", t, s, outgoing_window_delta, send_bytes); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, - send_bytes); + grpc_chttp2_flowctl_sent_data(&t->flow_control, + &s->flow_control, send_bytes); max_outgoing -= send_bytes; if (s->compressed_data_buffer->length == 0) { s->sending_bytes += s->uncompressed_data_size; @@ -367,10 +358,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, is_last_frame, &s->stats.outgoing, &t->outbuf); - GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, + grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, send_bytes); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, - send_bytes); s->sending_bytes += send_bytes; } t->ping_state.pings_before_data_required = @@ -396,10 +385,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } - } else if (t->outgoing_window == 0) { + } else if (t->flow_control.remote_window == 0) { grpc_chttp2_list_add_stalled_by_transport(t, s); now_writing = true; - } else if (stream_outgoing_window == 0) { + } else if (stream_remote_window == 0) { grpc_chttp2_list_add_stalled_by_stream(t, s); now_writing = true; } @@ -453,22 +442,15 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } } - /* if the grpc_chttp2_transport is ready to send a window update, do so here - also; 3/4 is a magic number that will likely get tuned soon */ - uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t); - uint32_t threshold_to_send_transport_window_update = - t->outbuf.count > 0 ? 3 * target_incoming_window / 4 - : target_incoming_window / 2; - if (t->incoming_window <= threshold_to_send_transport_window_update && - t->incoming_window != target_incoming_window) { + uint32_t transport_announce = + grpc_chttp2_flowctl_maybe_send_transport_update(&t->flow_control); + if (transport_announce) { maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); - uint32_t announced = (uint32_t)GPR_CLAMP( - target_incoming_window - t->incoming_window, 0, UINT32_MAX); - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("write", t, incoming_window, announced); grpc_transport_one_way_stats throwaway_stats; - grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create( - 0, announced, &throwaway_stats)); + grpc_slice_buffer_add( + &t->outbuf, grpc_chttp2_window_update_create(0, transport_announce, + &throwaway_stats)); t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 29dfa885de..776e0765fe 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -968,6 +968,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); + if (url) { + gpr_free(url); + } unsigned int header_index; for (header_index = 0; header_index < s->header_array.count; header_index++) { diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index c72c37e2b5..9b66987b68 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -73,10 +73,8 @@ static const grpc_closure_scheduler_vtable finally_scheduler = { static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); grpc_combiner *grpc_combiner_create(void) { - grpc_combiner *lock = gpr_malloc(sizeof(*lock)); + grpc_combiner *lock = gpr_zalloc(sizeof(*lock)); gpr_ref_init(&lock->refs, 1); - lock->next_combiner_on_this_exec_ctx = NULL; - lock->time_to_execute_final_list = false; lock->scheduler.vtable = &scheduler; lock->finally_scheduler.vtable = &finally_scheduler; gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); diff --git a/src/core/lib/iomgr/nameser.h b/src/core/lib/iomgr/nameser.h new file mode 100644 index 0000000000..daed6de518 --- /dev/null +++ b/src/core/lib/iomgr/nameser.h @@ -0,0 +1,104 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_NAMESER_H +#define GRPC_CORE_LIB_IOMGR_NAMESER_H + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_ARPA_NAMESER + +#include <arpa/nameser.h> + +#else /* GRPC_HAVE_ARPA_NAMESER */ + +typedef enum __ns_class { + ns_c_invalid = 0, /* Cookie. */ + ns_c_in = 1, /* Internet. */ + ns_c_2 = 2, /* unallocated/unsupported. */ + ns_c_chaos = 3, /* MIT Chaos-net. */ + ns_c_hs = 4, /* MIT Hesiod. */ + /* Query class values which do not appear in resource records */ + ns_c_none = 254, /* for prereq. sections in update requests */ + ns_c_any = 255, /* Wildcard match. */ + ns_c_max = 65536 +} ns_class; + +typedef enum __ns_type { + ns_t_invalid = 0, /* Cookie. */ + ns_t_a = 1, /* Host address. */ + ns_t_ns = 2, /* Authoritative server. */ + ns_t_md = 3, /* Mail destination. */ + ns_t_mf = 4, /* Mail forwarder. */ + ns_t_cname = 5, /* Canonical name. */ + ns_t_soa = 6, /* Start of authority zone. */ + ns_t_mb = 7, /* Mailbox domain name. */ + ns_t_mg = 8, /* Mail group member. */ + ns_t_mr = 9, /* Mail rename name. */ + ns_t_null = 10, /* Null resource record. */ + ns_t_wks = 11, /* Well known service. */ + ns_t_ptr = 12, /* Domain name pointer. */ + ns_t_hinfo = 13, /* Host information. */ + ns_t_minfo = 14, /* Mailbox information. */ + ns_t_mx = 15, /* Mail routing information. */ + ns_t_txt = 16, /* Text strings. */ + ns_t_rp = 17, /* Responsible person. */ + ns_t_afsdb = 18, /* AFS cell database. */ + ns_t_x25 = 19, /* X_25 calling address. */ + ns_t_isdn = 20, /* ISDN calling address. */ + ns_t_rt = 21, /* Router. */ + ns_t_nsap = 22, /* NSAP address. */ + ns_t_nsap_ptr = 23, /* Reverse NSAP lookup (deprecated). */ + ns_t_sig = 24, /* Security signature. */ + ns_t_key = 25, /* Security key. */ + ns_t_px = 26, /* X.400 mail mapping. */ + ns_t_gpos = 27, /* Geographical position (withdrawn). */ + ns_t_aaaa = 28, /* Ip6 Address. */ + ns_t_loc = 29, /* Location Information. */ + ns_t_nxt = 30, /* Next domain (security). */ + ns_t_eid = 31, /* Endpoint identifier. */ + ns_t_nimloc = 32, /* Nimrod Locator. */ + ns_t_srv = 33, /* Server Selection. */ + ns_t_atma = 34, /* ATM Address */ + ns_t_naptr = 35, /* Naming Authority PoinTeR */ + ns_t_kx = 36, /* Key Exchange */ + ns_t_cert = 37, /* Certification record */ + ns_t_a6 = 38, /* IPv6 address (deprecates AAAA) */ + ns_t_dname = 39, /* Non-terminal DNAME (for IPv6) */ + ns_t_sink = 40, /* Kitchen sink (experimentatl) */ + ns_t_opt = 41, /* EDNS0 option (meta-RR) */ + ns_t_apl = 42, /* Address prefix list (RFC3123) */ + ns_t_ds = 43, /* Delegation Signer (RFC4034) */ + ns_t_sshfp = 44, /* SSH Key Fingerprint (RFC4255) */ + ns_t_rrsig = 46, /* Resource Record Signature (RFC4034) */ + ns_t_nsec = 47, /* Next Secure (RFC4034) */ + ns_t_dnskey = 48, /* DNS Public Key (RFC4034) */ + ns_t_tkey = 249, /* Transaction key */ + ns_t_tsig = 250, /* Transaction signature. */ + ns_t_ixfr = 251, /* Incremental zone transfer. */ + ns_t_axfr = 252, /* Transfer zone of authority. */ + ns_t_mailb = 253, /* Transfer mailbox records. */ + ns_t_maila = 254, /* Transfer mail agent records. */ + ns_t_any = 255, /* Wildcard match. */ + ns_t_zxfr = 256, /* BIND-specific, nonstandard. */ + ns_t_max = 65536 +} ns_type; + +#endif /* GRPC_HAVE_ARPA_NAMESER */ + +#endif /* GRPC_CORE_LIB_IOMGR_NAMESER_H */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index f5d15b4850..c12058f890 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -24,6 +24,7 @@ #if defined(GRPC_UV) // Do nothing #elif defined(GPR_MANYLINUX1) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 @@ -51,6 +52,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_LINUX) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 @@ -82,6 +84,7 @@ #define GRPC_POSIX_SOCKETUTILS #endif #elif defined(GPR_APPLE) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 @@ -93,6 +96,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_FREEBSD) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 @@ -104,6 +108,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_NACL) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 diff --git a/src/core/lib/json/json_reader.c b/src/core/lib/json/json_reader.c index 75f59e0912..094a35176c 100644 --- a/src/core/lib/json/json_reader.c +++ b/src/core/lib/json/json_reader.c @@ -178,6 +178,7 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader *reader) { json_reader_string_clear(reader); reader->state = GRPC_JSON_STATE_VALUE_END; /* The missing break here is intentional. */ + /* fallthrough */ case GRPC_JSON_STATE_VALUE_END: case GRPC_JSON_STATE_OBJECT_KEY_BEGIN: diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 239a211c0b..b9da6e16b2 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -147,7 +147,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, goto done; } // Get unused bytes. - unsigned char *unused_bytes = NULL; + const unsigned char *unused_bytes = NULL; size_t unused_bytes_size = 0; result = tsi_handshaker_result_get_unused_bytes( h->handshaker_result, &unused_bytes, &unused_bytes_size); diff --git a/src/core/lib/support/murmur_hash.c b/src/core/lib/support/murmur_hash.c index f329611818..f06b970de7 100644 --- a/src/core/lib/support/murmur_hash.c +++ b/src/core/lib/support/murmur_hash.c @@ -62,8 +62,10 @@ uint32_t gpr_murmur_hash3(const void *key, size_t len, uint32_t seed) { switch (len & 3) { case 3: k1 ^= ((uint32_t)tail[2]) << 16; + /* fallthrough */ case 2: k1 ^= ((uint32_t)tail[1]) << 8; + /* fallthrough */ case 1: k1 ^= tail[0]; k1 *= c1; diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index ef8405cca8..55934964f3 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -18,6 +18,7 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" @@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->cq = cq; alarm->tag = tag; - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index bfd2b45fcb..e18874d054 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -648,6 +648,8 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, static grpc_error *error_from_status(grpc_status_code status, const char *description) { + // copying 'description' is needed to ensure the grpc_call_cancel_with_status + // guarantee that can be short-lived. return grpc_error_set_int( grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description), GRPC_ERROR_STR_GRPC_MESSAGE, @@ -898,7 +900,7 @@ grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) { return call->incoming_stream_compression_algorithm; } -static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { +static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) { return (grpc_linked_mdelem *)&md->internal_data; } @@ -922,7 +924,7 @@ static int prepare_application_metadata( for (i = 0; i < total_count; i++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); - grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + grpc_linked_mdelem *l = linked_from_md(md); GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); if (!GRPC_LOG_IF_ERROR("validate_metadata", grpc_validate_header_key_is_legal(md->key))) { @@ -939,7 +941,7 @@ static int prepare_application_metadata( for (int j = 0; j < i; j++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, j, count); - grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + grpc_linked_mdelem *l = linked_from_md(md); GRPC_MDELEM_UNREF(exec_ctx, l->md); } return 0; @@ -957,9 +959,12 @@ static int prepare_application_metadata( } for (i = 0; i < total_count; i++) { grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); - GRPC_LOG_IF_ERROR( - "prepare_application_metadata", - grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md))); + grpc_linked_mdelem *l = linked_from_md(md); + grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l); + if (error != GRPC_ERROR_NONE) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + } + GRPC_LOG_IF_ERROR("prepare_application_metadata", error); } call->send_extra_metadata_count = 0; @@ -1571,7 +1576,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1900,7 +1905,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); @@ -2021,6 +2026,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) { return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; + case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN: + return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN"; case GRPC_CALL_OK: return "GRPC_CALL_OK"; } diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 80eb80af78..e85b308850 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 978d7b4171..3d82a32e82 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -196,7 +196,7 @@ typedef struct cq_vtable { void (*init)(void *data); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); void (*destroy)(void *data); - void (*begin_op)(grpc_completion_queue *cq, void *tag); + bool (*begin_op)(grpc_completion_queue *cq, void *tag); void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, } } -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { - cq_next_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); -} - -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_ref(&cqd->pending_events); -} - -void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { -#ifndef NDEBUG - gpr_mu_lock(cq->mu); - if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { - cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); - cq->outstanding_tags = - gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * - cq->outstanding_tag_capacity); - } - cq->outstanding_tags[cq->outstanding_tag_count++] = tag; - gpr_mu_unlock(cq->mu); -#endif - cq->vtable->begin_op(cq, tag); -} - #ifndef NDEBUG static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; @@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + while (true) { + gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events); + if (count == 0) { + return false; + } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) { + break; + } + } + return true; +} + +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_ref(&cqd->pending_events); + return true; +} + +bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + return cq->vtable->begin_op(cq, tag); +} + /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ @@ -855,8 +863,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second attempt at popping. Not doing this can potentially deadlock this - thread - forever (if the deadline is infinity) */ + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } @@ -869,10 +876,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because (cq->shutdown == true) is only possible when there is no pending - work - (i.e cq->pending_events == 0) and any outstanding - grpc_cq_completion - events are already queued on this cq */ + work (i.e cq->pending_events == 0) and any outstanding completion + events should have already been queued on this cq */ continue; } @@ -909,11 +914,6 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, is_finished_arg.first_loop = false; } - GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(is_finished_arg.stolen_completion == NULL); - if (cq_event_queue_num_items(&cqd->queue) > 0 && gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); @@ -921,6 +921,11 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, gpr_mu_unlock(cq->mu); } + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + GPR_TIMER_END("grpc_completion_queue_next", 0); return ret; diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index af44482513..69d144bd95 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. - \a tag is currently used only in debug builds. */ -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); + \a tag is currently used only in debug builds. Return true on success, and + false if completion_queue has been shutdown. */ +bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index fce7f8dca1..66dcc299aa 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, } /* stay locked, and gather up some stuff to do */ - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } details->reserved = NULL; rc->cq_idx = cq_idx; rc->type = BATCH_CALL; @@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index 1280680663..810447313c 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -391,7 +391,7 @@ static tsi_result fake_handshaker_result_create_frame_protector( } static tsi_result fake_handshaker_result_get_unused_bytes( - const tsi_handshaker_result *self, unsigned char **bytes, + const tsi_handshaker_result *self, const unsigned char **bytes, size_t *bytes_size) { fake_handshaker_result *result = (fake_handshaker_result *)self; *bytes_size = result->unused_bytes_size; diff --git a/src/core/tsi/transport_security.c b/src/core/tsi/transport_security.c index be11d64472..2b1f4310c1 100644 --- a/src/core/tsi/transport_security.c +++ b/src/core/tsi/transport_security.c @@ -240,7 +240,7 @@ tsi_result tsi_handshaker_result_create_frame_protector( } tsi_result tsi_handshaker_result_get_unused_bytes( - const tsi_handshaker_result *self, unsigned char **bytes, + const tsi_handshaker_result *self, const unsigned char **bytes, size_t *bytes_size) { if (self == NULL || bytes == NULL || bytes_size == NULL) { return TSI_INVALID_ARGUMENT; diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h index 4a56c25602..2c7db6bca9 100644 --- a/src/core/tsi/transport_security.h +++ b/src/core/tsi/transport_security.h @@ -90,7 +90,8 @@ typedef struct { size_t *max_output_protected_frame_size, tsi_frame_protector **protector); tsi_result (*get_unused_bytes)(const tsi_handshaker_result *self, - unsigned char **bytes, size_t *bytes_size); + const unsigned char **bytes, + size_t *bytes_size); void (*destroy)(tsi_handshaker_result *self); } tsi_handshaker_result_vtable; diff --git a/src/core/tsi/transport_security_adapter.c b/src/core/tsi/transport_security_adapter.c index a0564945e4..b6dc660c47 100644 --- a/src/core/tsi/transport_security_adapter.c +++ b/src/core/tsi/transport_security_adapter.c @@ -50,7 +50,7 @@ static tsi_result adapter_result_create_frame_protector( } static tsi_result adapter_result_get_unused_bytes( - const tsi_handshaker_result *self, unsigned char **bytes, + const tsi_handshaker_result *self, const unsigned char **bytes, size_t *byte_size) { tsi_adapter_handshaker_result *impl = (tsi_adapter_handshaker_result *)self; *bytes = impl->unused_bytes; diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h index 137f8ee5c3..39ba8addc4 100644 --- a/src/core/tsi/transport_security_interface.h +++ b/src/core/tsi/transport_security_interface.h @@ -221,7 +221,7 @@ tsi_result tsi_handshaker_result_create_frame_protector( Ownership of the bytes is retained by the handshaker result. As a consequence, the caller must not free the bytes. */ tsi_result tsi_handshaker_result_get_unused_bytes( - const tsi_handshaker_result *self, unsigned char **bytes, + const tsi_handshaker_result *self, const unsigned char **bytes, size_t *byte_size); /* This method releases the tsi_handshaker_handshaker object. After this method |