diff options
-rw-r--r-- | src/core/ext/filters/max_age/max_age_filter.c | 26 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_handshaker.c | 9 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 8 | ||||
-rw-r--r-- | src/python/grpcio_reflection/MANIFEST.in | 4 | ||||
-rw-r--r-- | test/core/end2end/tests/max_connection_age.c | 5 | ||||
-rw-r--r-- | test/cpp/interop/client.cc | 3 | ||||
-rw-r--r-- | test/cpp/interop/client_helper.cc | 3 | ||||
-rw-r--r-- | test/cpp/interop/client_helper.h | 4 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_call_create.cc | 6 |
9 files changed, 56 insertions, 12 deletions
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index b03cb0ba0a..f858220c01 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -46,6 +46,7 @@ #define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX #define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX #define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX +#define MAX_CONNECTION_AGE_JITTER 0.1 typedef struct channel_data { /* We take a reference to the channel stack for the timer callback */ @@ -254,6 +255,21 @@ static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg, } } +/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out + connection storms. Note that the MAX_CONNECTION_AGE option without jitter + would not create connection storms by itself, but if there happened to be a + connection storm it could cause it to repeat at a fixed period. */ +static int add_random_max_connection_age_jitter(int value) { + /* generate a random number between 1 - MAX_CONNECTION_AGE_JITTER and + 1 + MAX_CONNECTION_AGE_JITTER */ + double multiplier = rand() * MAX_CONNECTION_AGE_JITTER * 2.0 / RAND_MAX + + 1.0 - MAX_CONNECTION_AGE_JITTER; + double result = multiplier * value; + /* INT_MAX - 0.5 converts the value to float, so that result will not be + cast to int implicitly before the comparison. */ + return result > INT_MAX - 0.5 ? INT_MAX : (int)result; +} + /* Constructor for call_data. */ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, @@ -283,7 +299,9 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, chand->max_connection_age = DEFAULT_MAX_CONNECTION_AGE_MS == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_MS, GPR_TIMESPAN); + : gpr_time_from_millis(add_random_max_connection_age_jitter( + DEFAULT_MAX_CONNECTION_AGE_MS), + GPR_TIMESPAN); chand->max_connection_age_grace = DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) @@ -300,8 +318,10 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, &args->channel_args->args[i], (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX}); chand->max_connection_age = - value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(value, GPR_TIMESPAN); + value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_millis( + add_random_max_connection_age_jitter(value), GPR_TIMESPAN); } else if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) { const int value = grpc_channel_arg_get_integer( diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 2f39327670..509b4b556d 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -287,12 +287,11 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, if (num_left_overs > 0) { /* Put the leftovers in our buffer (ownership transfered). */ if (has_left_overs_in_current_slice) { - grpc_slice_buffer_add( - &h->left_overs, - grpc_slice_split_tail(&h->args->read_buffer->slices[i], - consumed_slice_size)); + grpc_slice tail = grpc_slice_split_tail(&h->args->read_buffer->slices[i], + consumed_slice_size); + grpc_slice_buffer_add(&h->left_overs, tail); /* split_tail above increments refcount. */ - grpc_slice_unref_internal(exec_ctx, h->args->read_buffer->slices[i]); + grpc_slice_unref_internal(exec_ctx, tail); } grpc_slice_buffer_addn( &h->left_overs, &h->args->read_buffer->slices[i + 1], diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 87787b3eea..c37ead2318 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -175,7 +175,7 @@ struct grpc_call { /* have we received initial metadata */ bool has_initial_md_been_received; - batch_control active_batches[MAX_CONCURRENT_BATCHES]; + batch_control *active_batches[MAX_CONCURRENT_BATCHES]; grpc_transport_stream_op_batch_payload stream_op_payload; /* first idx: is_receiving, second idx: is_trailing */ @@ -1034,7 +1034,11 @@ static batch_control *allocate_batch_control(grpc_call *call, const grpc_op *ops, size_t num_ops) { int slot = batch_slot_for_op(ops[0].op); - batch_control *bctl = &call->active_batches[slot]; + batch_control **pslot = &call->active_batches[slot]; + if (*pslot == NULL) { + *pslot = gpr_arena_alloc(call->arena, sizeof(batch_control)); + } + batch_control *bctl = *pslot; if (bctl->call != NULL) { return NULL; } diff --git a/src/python/grpcio_reflection/MANIFEST.in b/src/python/grpcio_reflection/MANIFEST.in new file mode 100644 index 0000000000..0f2130c0b5 --- /dev/null +++ b/src/python/grpcio_reflection/MANIFEST.in @@ -0,0 +1,4 @@ +include grpc_version.py +include reflection_commands.py +graft grpc_reflection +global-exclude *.pyc diff --git a/test/core/end2end/tests/max_connection_age.c b/test/core/end2end/tests/max_connection_age.c index 59bfdbabb9..04bdb39445 100644 --- a/test/core/end2end/tests/max_connection_age.c +++ b/test/core/end2end/tests/max_connection_age.c @@ -47,6 +47,7 @@ #define MAX_CONNECTION_AGE_GRACE_MS 1000 #define MAX_CONNECTION_IDLE_MS 9999 +#define MAX_CONNECTION_AGE_JITTER_MULTIPLIER 1.1 #define CALL_DEADLINE_S 10 /* The amount of time we wait for the connection to time out, but after it the connection should not use up its grace period. It should be a number between @@ -169,8 +170,8 @@ static void test_max_age_forcibly_close(grpc_end2end_test_config config) { cq_verify(cqv); gpr_timespec expect_shutdown_time = grpc_timeout_milliseconds_to_deadline( - MAX_CONNECTION_AGE_MS + MAX_CONNECTION_AGE_GRACE_MS + - IMMEDIATE_SHUTDOWN_GRACE_TIME_MS); + (int)(MAX_CONNECTION_AGE_MS * MAX_CONNECTION_AGE_JITTER_MULTIPLIER) + + MAX_CONNECTION_AGE_GRACE_MS + IMMEDIATE_SHUTDOWN_GRACE_TIME_MS); /* Wait for the channel to reach its max age */ cq_verify_empty_timeout(cqv, CQ_MAX_CONNECTION_AGE_WAIT_TIME_S); diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 5688ab7971..369413e6a1 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -99,6 +99,7 @@ DEFINE_bool(do_not_abort_on_transient_failures, false, using grpc::testing::CreateChannelForTestCase; using grpc::testing::GetServiceAccountJsonKey; +using grpc::testing::UpdateActions; int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); @@ -165,6 +166,8 @@ int main(int argc, char** argv) { // actions["cacheable_unary"] = // std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client); + UpdateActions(&actions); + if (FLAGS_test_case == "all") { for (const auto& action : actions) { action.second(); diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc index d3192ad0c9..784cd2826d 100644 --- a/test/cpp/interop/client_helper.cc +++ b/test/cpp/interop/client_helper.cc @@ -89,6 +89,9 @@ grpc::string GetOauth2AccessToken() { return access_token; } +void UpdateActions( + std::unordered_map<grpc::string, std::function<bool()>>* actions) {} + std::shared_ptr<Channel> CreateChannelForTestCase( const grpc::string& test_case) { GPR_ASSERT(FLAGS_server_port); diff --git a/test/cpp/interop/client_helper.h b/test/cpp/interop/client_helper.h index 622b96e4fb..387530a21c 100644 --- a/test/cpp/interop/client_helper.h +++ b/test/cpp/interop/client_helper.h @@ -35,6 +35,7 @@ #define GRPC_TEST_CPP_INTEROP_CLIENT_HELPER_H #include <memory> +#include <unordered_map> #include <grpc++/channel.h> @@ -47,6 +48,9 @@ grpc::string GetServiceAccountJsonKey(); grpc::string GetOauth2AccessToken(); +void UpdateActions( + std::unordered_map<grpc::string, std::function<bool()>>* actions); + std::shared_ptr<Channel> CreateChannelForTestCase( const grpc::string& test_case); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index cc37f0c9e9..136b7c0340 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -54,6 +54,7 @@ extern "C" { #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/channel/message_size_filter.h" +#include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" } @@ -152,6 +153,7 @@ static void BM_LameChannelCallCreateCpp(benchmark::State &state) { grpc::testing::EchoResponse recv_response; grpc::Status recv_status; while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); grpc::ClientContext cli_ctx; auto reader = stub->AsyncEcho(&cli_ctx, send_request, &cq); reader->Finish(&recv_response, &recv_status, tag(0)); @@ -429,6 +431,7 @@ static void BM_IsolatedFilter(benchmark::State &state) { const int kArenaSize = 4096; call_args.arena = gpr_arena_create(kArenaSize); while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1, DoNothing, NULL, &call_args)); typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack); @@ -596,6 +599,7 @@ static void BM_IsolatedCall_NoOp(benchmark::State &state) { void *method_hdl = grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); grpc_call_destroy(grpc_channel_create_registered_call( fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(), method_hdl, deadline, NULL)); @@ -634,6 +638,7 @@ static void BM_IsolatedCall_Unary(benchmark::State &state) { ops[5].data.recv_status_on_client.status_details = &status_details; ops[5].data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata; while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); grpc_call *call = grpc_channel_create_registered_call( fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(), method_hdl, deadline, NULL); @@ -676,6 +681,7 @@ static void BM_IsolatedCall_StreamingSend(benchmark::State &state) { ops[0].op = GRPC_OP_SEND_MESSAGE; ops[0].data.send_message.send_message = send_message; while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); grpc_call_start_batch(call, ops, 1, tag(2), NULL); grpc_completion_queue_next(fixture.cq(), gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); |