diff options
-rw-r--r-- | grpc.def | 4 | ||||
-rw-r--r-- | include/grpc/impl/codegen/grpc_types.h | 5 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 171 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 17 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.c | 8 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.h | 12 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_test.cc | 10 |
7 files changed, 181 insertions, 46 deletions
@@ -145,6 +145,7 @@ EXPORTS grpc_slice_from_copied_string grpc_slice_from_copied_buffer grpc_slice_from_static_string + grpc_slice_from_static_buffer grpc_slice_sub grpc_slice_sub_no_ref grpc_slice_split_tail @@ -156,8 +157,11 @@ EXPORTS grpc_slice_buf_cmp grpc_slice_buf_start_eq grpc_slice_rchr + grpc_slice_chr + grpc_slice_slice grpc_slice_hash grpc_slice_is_equivalent + grpc_slice_dup grpc_slice_buffer_init grpc_slice_buffer_destroy grpc_slice_buffer_add diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index ac9a007641..04d01eaffd 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -261,6 +261,11 @@ typedef enum grpc_call_error { GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH } grpc_call_error; +/* Default send/receive message size limits in bytes. -1 for unlimited. */ +/* TODO(roth) Make this match the default receive limit after next release */ +#define GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 +#define GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024) + /* Write Flags: */ /** Hint that the write may be buffered and need not go out on the wire immediately. GRPC is free to buffer the message until the next non-buffered diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 795d68d5e7..ebb3f8044b 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -480,6 +480,68 @@ static grpc_lb_addresses *process_serverlist_locked( return lb_addresses; } +/* returns true if the new RR policy should replace the current one, if any */ +static bool update_lb_connectivity_status_locked( + grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { + grpc_error *curr_state_error; + const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check( + &glb_policy->state_tracker, &curr_state_error); + + /* The new connectivity status is a function of the previous one and the new + * input coming from the status of the RR policy. + * + * current state (grpclb's) + * | + * v || I | C | R | TF | SD | <- new state (RR's) + * ===++====+=====+=====+======+======+ + * I || I | C | R | [I] | [I] | + * ---++----+-----+-----+------+------+ + * C || I | C | R | [C] | [C] | + * ---++----+-----+-----+------+------+ + * R || I | C | R | [R] | [R] | + * ---++----+-----+-----+------+------+ + * TF || I | C | R | [TF] | [TF] | + * ---++----+-----+-----+------+------+ + * SD || NA | NA | NA | NA | NA | (*) + * ---++----+-----+-----+------+------+ + * + * A [STATE] indicates that the old RR policy is kept. In those cases, STATE + * is the current state of grpclb, which is left untouched. + * + * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to + * the previous RR instance. + * + * Note that the status is never updated to SHUTDOWN as a result of calling + * this function. Only glb_shutdown() has the power to set that state. + * + * (*) This function mustn't be called during shutting down. */ + GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); + + switch (new_rr_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: + GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE); + return false; /* don't replace the RR policy */ + case GRPC_CHANNEL_INIT: + case GRPC_CHANNEL_IDLE: + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_READY: + GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE); + } + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Setting grpclb's state to %s from new RR policy %p state.", + grpc_connectivity_state_name(new_rr_state), + (void *)glb_policy->rr_policy); + } + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, + new_rr_state, GRPC_ERROR_REF(new_rr_state_error), + "update_lb_connectivity_status_locked"); + return true; +} + /* perform a pick over \a rr_policy. Given that a pick can return immediately * (ignoring its completion callback) we need to perform the cleanups this * callback would be otherwise resposible for */ @@ -543,49 +605,84 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy, grpc_error *error) { + glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); + if (glb_policy->shutting_down) return; + + grpc_lb_policy *new_rr_policy = + create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); + if (new_rr_policy == NULL) { + gpr_log(GPR_ERROR, + "Failure creating a RoundRobin policy for serverlist update with " + "%lu entries. The previous RR instance (%p), if any, will continue " + "to be used. Future updates from the LB will attempt to create new " + "instances.", + (unsigned long)glb_policy->serverlist->num_servers, + (void *)glb_policy->rr_policy); + return; + } + + grpc_error *new_rr_state_error = NULL; + const grpc_connectivity_state new_rr_state = + grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy, + &new_rr_state_error); + /* Connectivity state is a function of the new RR policy just created */ + const bool replace_old_rr = update_lb_connectivity_status_locked( + exec_ctx, glb_policy, new_rr_state, new_rr_state_error); + + if (!replace_old_rr) { + /* dispose of the new RR policy that won't be used after all */ + GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace"); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Keeping old RR policy (%p) despite new serverlist: new RR " + "policy was in %s connectivity state.", + (void *)glb_policy->rr_policy, + grpc_connectivity_state_name(new_rr_state)); + } + return; + } + if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy); + gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)", + (void *)new_rr_policy, (void *)glb_policy->rr_policy); } + if (glb_policy->rr_policy != NULL) { /* if we are phasing out an existing RR instance, unref it. */ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); } - glb_policy->rr_policy = - create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy); - } + /* Finally update the RR policy to the newly created one */ + glb_policy->rr_policy = new_rr_policy; - GPR_ASSERT(glb_policy->rr_policy != NULL); + /* Add the gRPC LB's interested_parties pollset_set to that of the newly + * created RR policy. This will make the RR policy progress upon activity on + * gRPC LB, which in turn is tied to the application's call */ grpc_pollset_set_add_pollset_set(exec_ctx, glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); + /* Allocate the data for the tracking of the new RR policy's connectivity. + * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = gpr_malloc(sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, rr_connectivity); rr_connectivity->glb_policy = glb_policy; - rr_connectivity->state = grpc_lb_policy_check_connectivity( - exec_ctx, glb_policy->rr_policy, &error); + rr_connectivity->state = new_rr_state; - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_connectivity->state, GRPC_ERROR_REF(error), - "rr_handover"); - /* subscribe */ + /* Subscribe to changes to the connectivity of the new RR */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); - /* flush pending ops */ + /* Update picks and pings in wait */ pending_pick *pp; while ((pp = glb_policy->pending_picks)) { glb_policy->pending_picks = pp->next; @@ -616,28 +713,36 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - /* If shutdown or error free the arg. Rely on the rest of the code to set the - * right grpclb status. */ - rr_connectivity_data *rr_conn_data = arg; - glb_lb_policy *glb_policy = rr_conn_data->glb_policy; - gpr_mu_lock(&glb_policy->mu); + rr_connectivity_data *rr_connectivity = arg; + glb_lb_policy *glb_policy = rr_connectivity->glb_policy; - if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN && - !glb_policy->shutting_down) { - /* RR not shutting down. Mimic the RR's policy state */ - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, GRPC_ERROR_REF(error), - "rr_connectivity_cb"); - /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ + gpr_mu_lock(&glb_policy->mu); + const bool shutting_down = glb_policy->shutting_down; + bool unref_needed = false; + GRPC_ERROR_REF(error); + + if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) { + /* RR policy shutting down. Don't renew subscription and free the arg of + * this callback. In addition we need to stash away the current policy to + * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last + * one, the policy would be destroyed, alongside the lock, which would + * result in a use-after-free */ + unref_needed = true; + gpr_free(rr_connectivity); + } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ + update_lb_connectivity_status_locked(exec_ctx, glb_policy, + rr_connectivity->state, error); + /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_conn_data->state, - &rr_conn_data->on_change); - } else { + &rr_connectivity->state, + &rr_connectivity->on_change); + } + gpr_mu_unlock(&glb_policy->mu); + if (unref_needed) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "rr_connectivity_cb"); - gpr_free(rr_conn_data); } - gpr_mu_unlock(&glb_policy->mu); + GRPC_ERROR_UNREF(error); } static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, @@ -1145,7 +1250,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, /* and update the copy in the glb_lb_policy instance */ glb_policy->serverlist = serverlist; - rr_handover_locked(exec_ctx, glb_policy, error); + rr_handover_locked(exec_ctx, glb_policy); } } else { if (grpc_lb_glb_trace) { diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index bddfba4770..debd41cca2 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -34,6 +34,7 @@ #include <limits.h> #include <string.h> +#include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -42,10 +43,6 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" -#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited. -// The protobuf library will (by default) start warning at 100 megs. -#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024) - typedef struct message_size_limits { int max_send_size; int max_recv_size; @@ -205,20 +202,20 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, GPR_ASSERT(!args->is_last); channel_data* chand = elem->channel_data; memset(chand, 0, sizeof(*chand)); - chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH; - chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH; + chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH; + chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH; for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, - INT_MAX}; + const grpc_integer_options options = { + GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX}; chand->max_send_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, - INT_MAX}; + const grpc_integer_options options = { + GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX}; chand->max_recv_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 1d5f70e534..6c3a15c07f 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -183,6 +183,7 @@ grpc_slice_intern_type grpc_slice_intern_import; grpc_slice_from_copied_string_type grpc_slice_from_copied_string_import; grpc_slice_from_copied_buffer_type grpc_slice_from_copied_buffer_import; grpc_slice_from_static_string_type grpc_slice_from_static_string_import; +grpc_slice_from_static_buffer_type grpc_slice_from_static_buffer_import; grpc_slice_sub_type grpc_slice_sub_import; grpc_slice_sub_no_ref_type grpc_slice_sub_no_ref_import; grpc_slice_split_tail_type grpc_slice_split_tail_import; @@ -194,8 +195,11 @@ grpc_slice_str_cmp_type grpc_slice_str_cmp_import; grpc_slice_buf_cmp_type grpc_slice_buf_cmp_import; grpc_slice_buf_start_eq_type grpc_slice_buf_start_eq_import; grpc_slice_rchr_type grpc_slice_rchr_import; +grpc_slice_chr_type grpc_slice_chr_import; +grpc_slice_slice_type grpc_slice_slice_import; grpc_slice_hash_type grpc_slice_hash_import; grpc_slice_is_equivalent_type grpc_slice_is_equivalent_import; +grpc_slice_dup_type grpc_slice_dup_import; grpc_slice_buffer_init_type grpc_slice_buffer_init_import; grpc_slice_buffer_destroy_type grpc_slice_buffer_destroy_import; grpc_slice_buffer_add_type grpc_slice_buffer_add_import; @@ -465,6 +469,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_slice_from_copied_string_import = (grpc_slice_from_copied_string_type) GetProcAddress(library, "grpc_slice_from_copied_string"); grpc_slice_from_copied_buffer_import = (grpc_slice_from_copied_buffer_type) GetProcAddress(library, "grpc_slice_from_copied_buffer"); grpc_slice_from_static_string_import = (grpc_slice_from_static_string_type) GetProcAddress(library, "grpc_slice_from_static_string"); + grpc_slice_from_static_buffer_import = (grpc_slice_from_static_buffer_type) GetProcAddress(library, "grpc_slice_from_static_buffer"); grpc_slice_sub_import = (grpc_slice_sub_type) GetProcAddress(library, "grpc_slice_sub"); grpc_slice_sub_no_ref_import = (grpc_slice_sub_no_ref_type) GetProcAddress(library, "grpc_slice_sub_no_ref"); grpc_slice_split_tail_import = (grpc_slice_split_tail_type) GetProcAddress(library, "grpc_slice_split_tail"); @@ -476,8 +481,11 @@ void grpc_rb_load_imports(HMODULE library) { grpc_slice_buf_cmp_import = (grpc_slice_buf_cmp_type) GetProcAddress(library, "grpc_slice_buf_cmp"); grpc_slice_buf_start_eq_import = (grpc_slice_buf_start_eq_type) GetProcAddress(library, "grpc_slice_buf_start_eq"); grpc_slice_rchr_import = (grpc_slice_rchr_type) GetProcAddress(library, "grpc_slice_rchr"); + grpc_slice_chr_import = (grpc_slice_chr_type) GetProcAddress(library, "grpc_slice_chr"); + grpc_slice_slice_import = (grpc_slice_slice_type) GetProcAddress(library, "grpc_slice_slice"); grpc_slice_hash_import = (grpc_slice_hash_type) GetProcAddress(library, "grpc_slice_hash"); grpc_slice_is_equivalent_import = (grpc_slice_is_equivalent_type) GetProcAddress(library, "grpc_slice_is_equivalent"); + grpc_slice_dup_import = (grpc_slice_dup_type) GetProcAddress(library, "grpc_slice_dup"); grpc_slice_buffer_init_import = (grpc_slice_buffer_init_type) GetProcAddress(library, "grpc_slice_buffer_init"); grpc_slice_buffer_destroy_import = (grpc_slice_buffer_destroy_type) GetProcAddress(library, "grpc_slice_buffer_destroy"); grpc_slice_buffer_add_import = (grpc_slice_buffer_add_type) GetProcAddress(library, "grpc_slice_buffer_add"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 7fee273bb5..799cfaabfe 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -500,6 +500,9 @@ extern grpc_slice_from_copied_buffer_type grpc_slice_from_copied_buffer_import; typedef grpc_slice(*grpc_slice_from_static_string_type)(const char *source); extern grpc_slice_from_static_string_type grpc_slice_from_static_string_import; #define grpc_slice_from_static_string grpc_slice_from_static_string_import +typedef grpc_slice(*grpc_slice_from_static_buffer_type)(const void *source, size_t len); +extern grpc_slice_from_static_buffer_type grpc_slice_from_static_buffer_import; +#define grpc_slice_from_static_buffer grpc_slice_from_static_buffer_import typedef grpc_slice(*grpc_slice_sub_type)(grpc_slice s, size_t begin, size_t end); extern grpc_slice_sub_type grpc_slice_sub_import; #define grpc_slice_sub grpc_slice_sub_import @@ -533,12 +536,21 @@ extern grpc_slice_buf_start_eq_type grpc_slice_buf_start_eq_import; typedef int(*grpc_slice_rchr_type)(grpc_slice s, char c); extern grpc_slice_rchr_type grpc_slice_rchr_import; #define grpc_slice_rchr grpc_slice_rchr_import +typedef int(*grpc_slice_chr_type)(grpc_slice s, char c); +extern grpc_slice_chr_type grpc_slice_chr_import; +#define grpc_slice_chr grpc_slice_chr_import +typedef int(*grpc_slice_slice_type)(grpc_slice haystack, grpc_slice needle); +extern grpc_slice_slice_type grpc_slice_slice_import; +#define grpc_slice_slice grpc_slice_slice_import typedef uint32_t(*grpc_slice_hash_type)(grpc_slice s); extern grpc_slice_hash_type grpc_slice_hash_import; #define grpc_slice_hash grpc_slice_hash_import typedef int(*grpc_slice_is_equivalent_type)(grpc_slice a, grpc_slice b); extern grpc_slice_is_equivalent_type grpc_slice_is_equivalent_import; #define grpc_slice_is_equivalent grpc_slice_is_equivalent_import +typedef grpc_slice(*grpc_slice_dup_type)(grpc_slice a); +extern grpc_slice_dup_type grpc_slice_dup_import; +#define grpc_slice_dup grpc_slice_dup_import typedef void(*grpc_slice_buffer_init_type)(grpc_slice_buffer *sb); extern grpc_slice_buffer_init_type grpc_slice_buffer_init_import; #define grpc_slice_buffer_init grpc_slice_buffer_init_import diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index e5bf18991b..57a53ca11e 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -79,6 +79,9 @@ extern "C" { // - Test against a non-LB server. // - Random LB server closing the stream unexpectedly. // - Test using DNS-resolvable names (localhost?) +// - Test handling of creation of faulty RR instance by having the LB return a +// serverlist with non-existent backends after having initially returned a +// valid one. // // Findings from end to end testing to be covered here: // - Handling of LB servers restart, including reconnection after backing-off @@ -342,9 +345,8 @@ static void start_backend_server(server_fixture *sf) { } GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); const string expected_token = - strlen(sf->lb_token_prefix) == 0 - ? "" - : sf->lb_token_prefix + std::to_string(sf->port); + strlen(sf->lb_token_prefix) == 0 ? "" : sf->lb_token_prefix + + std::to_string(sf->port); GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token", expected_token.c_str())); @@ -522,6 +524,8 @@ static void perform_request(client_fixture *cf) { CQ_EXPECT_COMPLETION(cqv, tag(2), 1); cq_verify(cqv); GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD)); + GPR_ASSERT(grpc_channel_check_connectivity_state( + cf->client, 0 /* try to connect */) == GRPC_CHANNEL_READY); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(response_payload_recv); |