/* * * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include "src/core/ext/filters/client_channel/retry_throttle.h" #include #include #include #include #include #include #include // // server_retry_throttle_data // struct grpc_server_retry_throttle_data { gpr_refcount refs; int max_milli_tokens; int milli_token_ratio; gpr_atm milli_tokens; // A pointer to the replacement for this grpc_server_retry_throttle_data // entry. If non-NULL, then this entry is stale and must not be used. // We hold a reference to the replacement. gpr_atm replacement; }; static void get_replacement_throttle_data_if_needed( grpc_server_retry_throttle_data** throttle_data) { while (true) { grpc_server_retry_throttle_data* new_throttle_data = (grpc_server_retry_throttle_data*)gpr_atm_acq_load( &(*throttle_data)->replacement); if (new_throttle_data == nullptr) return; *throttle_data = new_throttle_data; } } bool grpc_server_retry_throttle_data_record_failure( grpc_server_retry_throttle_data* throttle_data) { // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. const int new_value = (int)gpr_atm_no_barrier_clamped_add( &throttle_data->milli_tokens, (gpr_atm)-1000, (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens); // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). return new_value > throttle_data->max_milli_tokens / 2; } void grpc_server_retry_throttle_data_record_success( grpc_server_retry_throttle_data* throttle_data) { // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. gpr_atm_no_barrier_clamped_add( &throttle_data->milli_tokens, (gpr_atm)throttle_data->milli_token_ratio, (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens); } grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* throttle_data) { gpr_ref(&throttle_data->refs); return throttle_data; } void grpc_server_retry_throttle_data_unref( grpc_server_retry_throttle_data* throttle_data) { if (gpr_unref(&throttle_data->refs)) { grpc_server_retry_throttle_data* replacement = (grpc_server_retry_throttle_data*)gpr_atm_acq_load( &throttle_data->replacement); if (replacement != nullptr) { grpc_server_retry_throttle_data_unref(replacement); } gpr_free(throttle_data); } } static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( int max_milli_tokens, int milli_token_ratio, grpc_server_retry_throttle_data* old_throttle_data) { grpc_server_retry_throttle_data* throttle_data = (grpc_server_retry_throttle_data*)gpr_malloc(sizeof(*throttle_data)); memset(throttle_data, 0, sizeof(*throttle_data)); gpr_ref_init(&throttle_data->refs, 1); throttle_data->max_milli_tokens = max_milli_tokens; throttle_data->milli_token_ratio = milli_token_ratio; int initial_milli_tokens = max_milli_tokens; // If there was a pre-existing entry for this server name, initialize // the token count by scaling proportionately to the old data. This // ensures that if we're already throttling retries on the old scale, // we will start out doing the same thing on the new one. if (old_throttle_data != nullptr) { double token_fraction = (int)gpr_atm_acq_load(&old_throttle_data->milli_tokens) / (double)old_throttle_data->max_milli_tokens; initial_milli_tokens = (int)(token_fraction * max_milli_tokens); } gpr_atm_rel_store(&throttle_data->milli_tokens, (gpr_atm)initial_milli_tokens); // If there was a pre-existing entry, mark it as stale and give it a // pointer to the new entry, which is its replacement. if (old_throttle_data != nullptr) { grpc_server_retry_throttle_data_ref(throttle_data); gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data); } return throttle_data; } // // avl vtable for string -> server_retry_throttle_data map // static void* copy_server_name(void* key, void* unused) { return gpr_strdup((const char*)key); } static long compare_server_name(void* key1, void* key2, void* unused) { return strcmp((const char*)key1, (const char*)key2); } static void destroy_server_retry_throttle_data(void* value, void* unused) { grpc_server_retry_throttle_data* throttle_data = (grpc_server_retry_throttle_data*)value; grpc_server_retry_throttle_data_unref(throttle_data); } static void* copy_server_retry_throttle_data(void* value, void* unused) { grpc_server_retry_throttle_data* throttle_data = (grpc_server_retry_throttle_data*)value; return grpc_server_retry_throttle_data_ref(throttle_data); } static void destroy_server_name(void* key, void* unused) { gpr_free(key); } static const gpr_avl_vtable avl_vtable = { destroy_server_name, copy_server_name, compare_server_name, destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; // // server_retry_throttle_map // static gpr_mu g_mu; static gpr_avl g_avl; void grpc_retry_throttle_map_init() { gpr_mu_init(&g_mu); g_avl = gpr_avl_create(&avl_vtable); } void grpc_retry_throttle_map_shutdown() { gpr_mu_destroy(&g_mu); gpr_avl_unref(g_avl, nullptr); } grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( const char* server_name, int max_milli_tokens, int milli_token_ratio) { gpr_mu_lock(&g_mu); grpc_server_retry_throttle_data* throttle_data = (grpc_server_retry_throttle_data*)gpr_avl_get(g_avl, (char*)server_name, nullptr); if (throttle_data == nullptr) { // Entry not found. Create a new one. throttle_data = grpc_server_retry_throttle_data_create( max_milli_tokens, milli_token_ratio, nullptr); g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data, nullptr); } else { if (throttle_data->max_milli_tokens != max_milli_tokens || throttle_data->milli_token_ratio != milli_token_ratio) { // Entry found but with old parameters. Create a new one based on // the original one. throttle_data = grpc_server_retry_throttle_data_create( max_milli_tokens, milli_token_ratio, throttle_data); g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data, nullptr); } else { // Entry found. Increase refcount. grpc_server_retry_throttle_data_ref(throttle_data); } } gpr_mu_unlock(&g_mu); return throttle_data; }