diff options
author | Mark D. Roth <roth@google.com> | 2016-09-21 08:12:11 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-09-21 08:12:11 -0700 |
commit | 933c955aafc734b75136510df38e877840c1f649 (patch) | |
tree | 4f3dd658417d51c2da53b0f2e0285b437cc178e0 /src/core | |
parent | 89675154020e5bdabe654b5b65087fe3935b696a (diff) | |
parent | 5bedd48c0e8b9582841c511037bc20781a6c681c (diff) |
Merge remote-tracking branch 'upstream/master' into http_connect
Diffstat (limited to 'src/core')
29 files changed, 614 insertions, 224 deletions
diff --git a/src/core/ext/census/trace_context.c b/src/core/ext/census/trace_context.c new file mode 100644 index 0000000000..fbb20d3448 --- /dev/null +++ b/src/core/ext/census/trace_context.c @@ -0,0 +1,86 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/census/trace_context.h" + +#include <grpc/census.h> +#include <grpc/support/log.h> +#include <stdbool.h> + +#include "third_party/nanopb/pb_decode.h" +#include "third_party/nanopb/pb_encode.h" + +// This function assumes the TraceContext is valid. +size_t encode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, + const size_t buf_size) { + // Create a stream that will write to our buffer. + pb_ostream_t stream = pb_ostream_from_buffer(buffer, buf_size); + + // encode message + bool status = pb_encode(&stream, google_trace_TraceContext_fields, ctxt); + + if (!status) { + gpr_log(GPR_DEBUG, "TraceContext encoding failed: %s", + PB_GET_ERROR(&stream)); + return 0; + } + + return stream.bytes_written; +} + +bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, + const size_t nbytes) { + // Create a stream that reads nbytes from the buffer. + pb_istream_t stream = pb_istream_from_buffer(buffer, nbytes); + + // decode message + bool status = pb_decode(&stream, google_trace_TraceContext_fields, ctxt); + + if (!status) { + gpr_log(GPR_DEBUG, "TraceContext decoding failed: %s", + PB_GET_ERROR(&stream)); + return false; + } + + // check fields + if (!ctxt->has_trace_id) { + gpr_log(GPR_DEBUG, "Invalid TraceContext: missing trace_id"); + return false; + } + if (!ctxt->has_span_id) { + gpr_log(GPR_DEBUG, "Invalid TraceContext: missing span_id"); + return false; + } + + return true; +} diff --git a/src/core/ext/census/trace_context.h b/src/core/ext/census/trace_context.h new file mode 100644 index 0000000000..ee71fef460 --- /dev/null +++ b/src/core/ext/census/trace_context.h @@ -0,0 +1,68 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Functions for manipulating trace contexts as defined in + src/proto/census/trace.proto */ +#ifndef GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H +#define GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H + +#include "src/core/ext/census/gen/trace_context.pb.h" + +/* Maximum number of bytes required to encode a TraceContext (31) +1 byte for trace_id field +1 byte for trace_id length +1 byte for trace_id.hi field +8 bytes for trace_id.hi (uint64_t) +1 byte for trace_id.lo field +8 bytes for trace_id.lo (uint64_t) +1 byte for span_id field +8 bytes for span_id (uint64_t) +1 byte for is_sampled field +1 byte for is_sampled (bool) */ +#define TRACE_MAX_CONTEXT_SIZE 31 + +/* Encode a trace context (ctxt) into proto format to the buffer provided. The +size of buffer must be at least TRACE_MAX_CONTEXT_SIZE. On success, returns the +number of bytes successfully encoded into buffer. On failure, returns 0. */ +size_t encode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, + const size_t buf_size); + +/* Decode a proto-encoded TraceContext from the provided buffer into the +TraceContext structure (ctxt). The function expects to be supplied the number +of bytes to be read from buffer (nbytes). This function will also validate that +the TraceContext has a span_id and a trace_id, and will return false if either +of these do not exist. On success, returns true and false otherwise. */ +bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, + const size_t nbytes); + +#endif diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c index 3b5d6dab2b..8f0e12296d 100644 --- a/src/core/ext/census/tracing.c +++ b/src/core/ext/census/tracing.c @@ -31,15 +31,19 @@ * */ +//#include "src/core/ext/census/tracing.h" + #include <grpc/census.h> /* TODO(aveitch): These are all placeholder implementations. */ -int census_trace_mask(const census_context *context) { - return CENSUS_TRACE_MASK_NONE; -} +// int census_trace_mask(const census_context *context) { +// return CENSUS_TRACE_MASK_NONE; +// } + +// void census_set_trace_mask(int trace_mask) {} -void census_set_trace_mask(int trace_mask) {} +// void census_trace_print(census_context *context, uint32_t type, +// const char *buffer, size_t n) {} -void census_trace_print(census_context *context, uint32_t type, - const char *buffer, size_t n) {} +// void SetTracerParams(const Params& params); diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 6b779efe57..b2b4fea83c 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -42,6 +42,7 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -63,6 +64,8 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ bool started_resolving; + /** client channel factory */ + grpc_client_channel_factory *client_channel_factory; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -173,20 +176,28 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); if (chand->resolver_result != NULL) { - lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result); + grpc_lb_policy_args lb_policy_args; + lb_policy_args.server_name = + grpc_resolver_result_get_server_name(chand->resolver_result); + lb_policy_args.addresses = + grpc_resolver_result_get_addresses(chand->resolver_result); + lb_policy_args.additional_args = + grpc_resolver_result_get_lb_policy_args(chand->resolver_result); + lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy = grpc_lb_policy_create( + exec_ctx, + grpc_resolver_result_get_lb_policy_name(chand->resolver_result), + &lb_policy_args); if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } - grpc_resolver_result_unref(exec_ctx, chand->resolver_result); + chand->resolver_result = NULL; } - chand->resolver_result = NULL; - if (lb_policy != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, chand->interested_parties); @@ -346,6 +357,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } + if (chand->client_channel_factory != NULL) { + grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); + } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, @@ -767,10 +781,12 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver) { +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory) { /* post construction initialization: set the transport setup pointer */ + GPR_ASSERT(client_channel_factory != NULL); grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu); @@ -784,6 +800,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result, &chand->on_resolver_result_changed); } + chand->client_channel_factory = client_channel_factory; + grpc_client_channel_factory_ref(client_channel_factory); gpr_mu_unlock(&chand->mu); } diff --git a/src/core/ext/client_config/client_channel.h b/src/core/ext/client_config/client_channel.h index 1e47ad34ad..abb5ac4d87 100644 --- a/src/core/ext/client_config/client_channel.h +++ b/src/core/ext/client_config/client_channel.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H #define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H +#include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/resolver.h" #include "src/core/lib/channel/channel_stack.h" @@ -46,12 +47,12 @@ extern const grpc_channel_filter grpc_client_channel_filter; -/* post-construction initializer to let the client channel know which - transport setup it should cancel upon destruction, or initiate when it needs - a connection */ -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver); +/* Post-construction initializer to give the client channel its resolver + and factory. */ +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory); grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); diff --git a/src/core/ext/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c index f86117aa38..c17af91a09 100644 --- a/src/core/ext/client_config/lb_policy_factory.c +++ b/src/core/ext/client_config/lb_policy_factory.c @@ -34,6 +34,7 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include "src/core/ext/client_config/lb_policy_factory.h" @@ -46,6 +47,25 @@ grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) { return addresses; } +grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses, + void* (*user_data_copy)(void*)) { + grpc_lb_addresses* new_addresses = + grpc_lb_addresses_create(addresses->num_addresses); + memcpy(new_addresses->addresses, addresses->addresses, + sizeof(grpc_lb_address) * addresses->num_addresses); + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (new_addresses->addresses[i].balancer_name != NULL) { + new_addresses->addresses[i].balancer_name = + gpr_strdup(new_addresses->addresses[i].balancer_name); + } + if (user_data_copy != NULL) { + new_addresses->addresses[i].user_data = + user_data_copy(new_addresses->addresses[i].user_data); + } + } + return new_addresses; +} + void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, void* address, size_t address_len, bool is_balancer, char* balancer_name, diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index 6247db2b61..ade55704f2 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -36,9 +36,9 @@ #include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/lb_policy.h" -#include "src/core/ext/client_config/resolver_result.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/resolve_address.h" typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; @@ -68,6 +68,11 @@ typedef struct grpc_lb_addresses { * \a num_addresses addresses. */ grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses); +/** Creates a copy of \a addresses. If \a user_data_copy is not NULL, + * it will be invoked to copy the \a user_data field of each address. */ +grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses, + void *(*user_data_copy)(void *)); + /** Sets the value of the address at index \a index of \a addresses. * \a address is a socket address of length \a address_len. * Takes ownership of \a balancer_name. */ @@ -82,10 +87,14 @@ void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses, void (*user_data_destroy)(void *)); /** Arguments passed to LB policies. */ +/* TODO(roth, ctiller): Consider replacing this struct with + grpc_channel_args. See comment in resolver_result.h for details. */ typedef struct grpc_lb_policy_args { - char *server_name; + const char *server_name; grpc_lb_addresses *addresses; grpc_client_channel_factory *client_channel_factory; + /* Can be used to pass implementation-specific parameters to the LB policy. */ + grpc_channel_args *additional_args; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { diff --git a/src/core/ext/client_config/resolver_factory.h b/src/core/ext/client_config/resolver_factory.h index f69bf79564..9ec5b9a70e 100644 --- a/src/core/ext/client_config/resolver_factory.h +++ b/src/core/ext/client_config/resolver_factory.h @@ -47,10 +47,7 @@ struct grpc_resolver_factory { const grpc_resolver_factory_vtable *vtable; }; -typedef struct grpc_resolver_args { - grpc_uri *uri; - grpc_client_channel_factory *client_channel_factory; -} grpc_resolver_args; +typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args; struct grpc_resolver_factory_vtable { void (*ref)(grpc_resolver_factory *factory); diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c index e7a4abd568..b5308a140c 100644 --- a/src/core/ext/client_config/resolver_registry.c +++ b/src/core/ext/client_config/resolver_registry.c @@ -128,15 +128,13 @@ static grpc_resolver_factory *resolve_factory(const char *target, return factory; } -grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory) { +grpc_resolver *grpc_resolver_create(const char *target) { grpc_uri *uri = NULL; grpc_resolver_factory *factory = resolve_factory(target, &uri); grpc_resolver *resolver; grpc_resolver_args args; memset(&args, 0, sizeof(args)); args.uri = uri; - args.client_channel_factory = client_channel_factory; resolver = grpc_resolver_factory_create_resolver(factory, &args); grpc_uri_destroy(uri); return resolver; diff --git a/src/core/ext/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h index 5ef1383cd3..92e248d548 100644 --- a/src/core/ext/client_config/resolver_registry.h +++ b/src/core/ext/client_config/resolver_registry.h @@ -55,8 +55,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ -grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory); +grpc_resolver *grpc_resolver_create(const char *target); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c index 242989a0da..63480d152b 100644 --- a/src/core/ext/client_config/resolver_result.c +++ b/src/core/ext/client_config/resolver_result.c @@ -34,40 +34,61 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/channel/channel_args.h" struct grpc_resolver_result { gpr_refcount refs; - grpc_lb_policy* lb_policy; + char* server_name; + grpc_lb_addresses* addresses; + char* lb_policy_name; + grpc_channel_args* lb_policy_args; }; -grpc_resolver_result* grpc_resolver_result_create() { - grpc_resolver_result* c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - gpr_ref_init(&c->refs, 1); - return c; +grpc_resolver_result* grpc_resolver_result_create( + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args) { + grpc_resolver_result* result = gpr_malloc(sizeof(*result)); + memset(result, 0, sizeof(*result)); + gpr_ref_init(&result->refs, 1); + result->server_name = gpr_strdup(server_name); + result->addresses = addresses; + result->lb_policy_name = gpr_strdup(lb_policy_name); + result->lb_policy_args = lb_policy_args; + return result; } -void grpc_resolver_result_ref(grpc_resolver_result* c) { gpr_ref(&c->refs); } +void grpc_resolver_result_ref(grpc_resolver_result* result) { + gpr_ref(&result->refs); +} void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, - grpc_resolver_result* c) { - if (gpr_unref(&c->refs)) { - if (c->lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "resolver_result"); - } - gpr_free(c); + grpc_resolver_result* result) { + if (gpr_unref(&result->refs)) { + gpr_free(result->server_name); + grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */); + gpr_free(result->lb_policy_name); + grpc_channel_args_destroy(result->lb_policy_args); + gpr_free(result); } } -void grpc_resolver_result_set_lb_policy(grpc_resolver_result* c, - grpc_lb_policy* lb_policy) { - GPR_ASSERT(c->lb_policy == NULL); - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "resolver_result"); - } - c->lb_policy = lb_policy; +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result) { + return result->server_name; +} + +grpc_lb_addresses* grpc_resolver_result_get_addresses( + grpc_resolver_result* result) { + return result->addresses; +} + +const char* grpc_resolver_result_get_lb_policy_name( + grpc_resolver_result* result) { + return result->lb_policy_name; } -grpc_lb_policy* grpc_resolver_result_get_lb_policy(grpc_resolver_result* c) { - return c->lb_policy; +grpc_channel_args* grpc_resolver_result_get_lb_policy_args( + grpc_resolver_result* result) { + return result->lb_policy_args; } diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h index 5a69d81990..414c2e2482 100644 --- a/src/core/ext/client_config/resolver_result.h +++ b/src/core/ext/client_config/resolver_result.h @@ -32,22 +32,43 @@ #ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H #define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H -#include <stdbool.h> - -#include "src/core/ext/client_config/lb_policy.h" +#include "src/core/ext/client_config/lb_policy_factory.h" #include "src/core/lib/iomgr/resolve_address.h" +// TODO(roth, ctiller): In the long term, we are considering replacing +// the resolver_result data structure with grpc_channel_args. The idea is +// that the resolver will return a set of channel args that contains the +// information that is currently in the resolver_result struct. For +// example, there will be specific args indicating the set of addresses +// and the name of the LB policy to instantiate. Note that if we did +// this, we would probably want to change the data structure of +// grpc_channel_args such to a hash table or AVL or some other data +// structure that does not require linear search to find keys. + /// Results reported from a grpc_resolver. typedef struct grpc_resolver_result grpc_resolver_result; -grpc_resolver_result* grpc_resolver_result_create(); +/// Takes ownership of \a addresses and \a lb_policy_args. +grpc_resolver_result* grpc_resolver_result_create( + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args); void grpc_resolver_result_ref(grpc_resolver_result* result); void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, grpc_resolver_result* result); -void grpc_resolver_result_set_lb_policy(grpc_resolver_result* result, - grpc_lb_policy* lb_policy); -grpc_lb_policy* grpc_resolver_result_get_lb_policy( +/// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +grpc_lb_addresses* grpc_resolver_result_get_addresses( + grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_lb_policy_name( + grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_resolver_result* result); #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */ diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 2c4364b259..1141844634 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -33,6 +33,7 @@ #include "src/core/ext/client_config/subchannel.h" +#include <limits.h> #include <string.h> #include <grpc/support/alloc.h> @@ -347,21 +348,16 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { - if (c->args->args[i].type == GRPC_ARG_INTEGER) { - if (c->args->args[i].value.integer >= 0) { - gpr_backoff_init( - &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_SUBCHANNEL_RECONNECT_JITTER, - GPR_MIN(c->args->args[i].value.integer, - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000), - c->args->args[i].value.integer); - } else { - gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS - " : must be non-negative"); - } - } else { - gpr_log(GPR_ERROR, - GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer"); + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&c->args->args[i], options); + if (value >= 0) { + gpr_backoff_init( + &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_SUBCHANNEL_RECONNECT_JITTER, + GPR_MIN(value, + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000), + value); } } } diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h index a24dbe80fb..218bb43e0a 100644 --- a/src/core/ext/client_config/subchannel.h +++ b/src/core/ext/client_config/subchannel.h @@ -163,7 +163,7 @@ struct grpc_subchannel_args { /** Channel arguments to be supplied to the newly created channel */ const grpc_channel_args *args; /** Server name */ - char *server_name; + const char *server_name; /** Address to connect to */ struct sockaddr *addr; size_t addr_len; diff --git a/src/core/ext/client_config/subchannel_index.c b/src/core/ext/client_config/subchannel_index.c index 40ce91492d..673f85b8cb 100644 --- a/src/core/ext/client_config/subchannel_index.c +++ b/src/core/ext/client_config/subchannel_index.c @@ -132,7 +132,7 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_connector_unref(exec_ctx, k->connector); gpr_free((grpc_channel_args *)k->args.filters); grpc_channel_args_destroy((grpc_channel_args *)k->args.args); - gpr_free(k->args.server_name); + gpr_free((void *)k->args.server_name); gpr_free(k->args.addr); gpr_free(k); } diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index a54cbb0b63..4ea164e639 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -283,7 +283,7 @@ typedef struct glb_lb_policy { /** mutex protecting remaining members */ gpr_mu mu; - char *server_name; // Does not own. + const char *server_name; // Does not own. grpc_client_channel_factory *cc_factory; /** for communicating with the LB server */ diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index cc028c3a55..e8ac1b12ae 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -61,8 +61,6 @@ typedef struct { char *name_to_resolve; /** default port to use */ char *default_port; - /** subchannel factory */ - grpc_client_channel_factory *client_channel_factory; /** load balancing policy name */ char *lb_policy_name; @@ -169,32 +167,21 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { dns_resolver *r = arg; grpc_resolver_result *result = NULL; - grpc_lb_policy *lb_policy; gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = false; if (r->addresses != NULL) { - grpc_lb_policy_args lb_policy_args; - memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.server_name = r->target_name; - lb_policy_args.addresses = grpc_lb_addresses_create(r->addresses->naddrs); + grpc_lb_addresses *addresses = + grpc_lb_addresses_create(r->addresses->naddrs); for (size_t i = 0; i < r->addresses->naddrs; ++i) { grpc_lb_addresses_set_address( - lb_policy_args.addresses, i, &r->addresses->addrs[i].addr, + addresses, i, &r->addresses->addrs[i].addr, r->addresses->addrs[i].len, false /* is_balancer */, NULL /* balancer_name */, NULL /* user_data */); } grpc_resolved_addresses_destroy(r->addresses); - lb_policy_args.client_channel_factory = r->client_channel_factory; - lb_policy = - grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); - grpc_lb_addresses_destroy(lb_policy_args.addresses, - NULL /* user_data_destroy */); - result = grpc_resolver_result_create(); - if (lb_policy != NULL) { - grpc_resolver_result_set_lb_policy(result, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); - } + result = grpc_resolver_result_create(r->target_name, addresses, + r->lb_policy_name, NULL); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); @@ -255,7 +242,6 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { if (r->resolved_result) { grpc_resolver_result_unref(exec_ctx, r->resolved_result); } - grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory); gpr_free(r->target_name); gpr_free(r->name_to_resolve); gpr_free(r->default_port); @@ -284,10 +270,8 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, r->target_name = gpr_strdup(path); r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->default_port = gpr_strdup(default_port); - r->client_channel_factory = args->client_channel_factory; gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); - grpc_client_channel_factory_ref(r->client_channel_factory); r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 34916e10be..74d2015e5c 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -40,7 +40,6 @@ #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> -#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/parse_address.h" #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -52,8 +51,6 @@ typedef struct { grpc_resolver base; /** refcount */ gpr_refcount refs; - /** subchannel factory */ - grpc_client_channel_factory *client_channel_factory; /** load balancing policy name */ char *lb_policy_name; @@ -122,18 +119,10 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r) { if (r->next_completion != NULL && !r->published) { - grpc_resolver_result *result = grpc_resolver_result_create(); - grpc_lb_policy_args lb_policy_args; - memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.server_name = ""; - lb_policy_args.addresses = r->addresses; - lb_policy_args.client_channel_factory = r->client_channel_factory; - grpc_lb_policy *lb_policy = - grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); - grpc_resolver_result_set_lb_policy(result, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = true; - *r->target_result = result; + *r->target_result = grpc_resolver_result_create( + "", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), + r->lb_policy_name, NULL); grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } @@ -142,7 +131,6 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); - grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory); grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */); gpr_free(r->lb_policy_name); gpr_free(r); @@ -244,8 +232,6 @@ static grpc_resolver *sockaddr_create( gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); - r->client_channel_factory = args->client_channel_factory; - grpc_client_channel_factory_ref(r->client_channel_factory); return &r->base; } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 14dc7f142f..c2b59569fd 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -161,7 +161,6 @@ typedef struct { grpc_client_channel_factory base; gpr_refcount refs; grpc_channel_args *merge_args; - grpc_channel *master; } client_channel_factory; static void client_channel_factory_ref( @@ -174,10 +173,6 @@ static void client_channel_factory_unref( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { client_channel_factory *f = (client_channel_factory *)cc_factory; if (gpr_unref(&f->refs)) { - if (f->master != NULL) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, - "client_channel_factory"); - } grpc_channel_args_destroy(f->merge_args); gpr_free(f); } @@ -218,15 +213,15 @@ static grpc_channel *client_channel_factory_create_channel( grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args, GRPC_CLIENT_CHANNEL, NULL); grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = grpc_resolver_create(target, &f->base); + grpc_resolver *resolver = grpc_resolver_create(target); if (!resolver) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "client_channel_factory_create_channel"); return NULL; } - grpc_client_channel_set_resolver( - exec_ctx, grpc_channel_get_channel_stack(channel), resolver); + grpc_client_channel_finish_initialization( + exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel"); return channel; @@ -258,12 +253,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_channel *channel = client_channel_factory_create_channel( &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); - if (channel != NULL) { - f->master = channel; - GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create"); - } - grpc_client_channel_factory_unref(&exec_ctx, &f->base); + grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_exec_ctx_finish(&exec_ctx); return channel != NULL ? channel : grpc_lame_client_channel_create( diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index a9616e92d0..31c54ff74c 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -221,7 +221,6 @@ typedef struct { gpr_refcount refs; grpc_channel_args *merge_args; grpc_channel_security_connector *security_connector; - grpc_channel *master; } client_channel_factory; static void client_channel_factory_ref( @@ -236,10 +235,6 @@ static void client_channel_factory_unref( if (gpr_unref(&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "client_channel_factory"); - if (f->master != NULL) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, - "client_channel_factory"); - } grpc_channel_args_destroy(f->merge_args); gpr_free(f); } @@ -284,10 +279,10 @@ static grpc_channel *client_channel_factory_create_channel( GRPC_CLIENT_CHANNEL, NULL); grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = grpc_resolver_create(target, &f->base); + grpc_resolver *resolver = grpc_resolver_create(target); if (resolver != NULL) { - grpc_client_channel_set_resolver( - exec_ctx, grpc_channel_get_channel_stack(channel), resolver); + grpc_client_channel_finish_initialization( + exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create"); } else { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, @@ -364,10 +359,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, grpc_channel *channel = client_channel_factory_create_channel( &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); - if (channel != NULL) { - f->master = channel; - GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_secure_channel_create"); - } grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index b6415e3f17..1dd7fef76f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -33,6 +33,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include <limits.h> #include <math.h> #include <stdio.h> #include <string.h> @@ -46,6 +47,7 @@ #include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/status_conversion.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" @@ -333,76 +335,65 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (is_client) { gpr_log(GPR_ERROR, "%s: is ignored on the client", GRPC_ARG_MAX_CONCURRENT_STREAMS); - } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_MAX_CONCURRENT_STREAMS); } else { - push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - (uint32_t)channel_args->args[i].value.integer); + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + push_setting(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + (uint32_t)value); + } } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER); - } else if ((t->global.next_stream_id & 1) != - (channel_args->args[i].value.integer & 1)) { - gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, - t->global.next_stream_id & 1, - is_client ? "client" : "server"); - } else { - t->global.next_stream_id = - (uint32_t)channel_args->args[i].value.integer; + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + if ((t->global.next_stream_id & 1) != (value & 1)) { + gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, + t->global.next_stream_id & 1, + is_client ? "client" : "server"); + } else { + t->global.next_stream_id = (uint32_t)value; + } } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES); - } else if (channel_args->args[i].value.integer <= 5) { - gpr_log(GPR_ERROR, "%s: must be at least 5", - GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES); - } else { - t->global.stream_lookahead = - (uint32_t)channel_args->args[i].value.integer; + const grpc_integer_options options = {-1, 5, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + t->global.stream_lookahead = (uint32_t)value; } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); - } else if (channel_args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s: must be non-negative", - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); - } else { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - (uint32_t)channel_args->args[i].value.integer); + (uint32_t)value); } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER); - } else if (channel_args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s: must be non-negative", - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER); - } else { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { grpc_chttp2_hpack_compressor_set_max_usable_size( - &t->writing.hpack_compressor, - (uint32_t)channel_args->args[i].value.integer); + &t->writing.hpack_compressor, (uint32_t)value); } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_MAX_METADATA_SIZE)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_MAX_METADATA_SIZE); - } else if (channel_args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s: must be non-negative", - GRPC_ARG_MAX_METADATA_SIZE); - } else { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - (uint32_t)channel_args->args[i].value.integer); + (uint32_t)value); } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_MAX_FRAME_SIZE)) { diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 79ceeb66b3..3a56b1ff20 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -271,3 +271,21 @@ int grpc_channel_args_compare(const grpc_channel_args *a, } return 0; } + +int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) { + if (arg->type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key); + return options.default_value; + } + if (arg->value.integer < options.min_value) { + gpr_log(GPR_ERROR, "%s ignored: it must be >= %d", arg->key, + options.min_value); + return options.default_value; + } + if (arg->value.integer > options.max_value) { + gpr_log(GPR_ERROR, "%s ignored: it must be <= %d", arg->key, + options.max_value); + return options.default_value; + } + return arg->value.integer; +} diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index aec61ee7c6..586a296d1f 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -89,4 +89,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states( int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b); +typedef struct grpc_integer_options { + int default_value; // Return this if value is outside of expected bounds. + int min_value; + int max_value; +} grpc_integer_options; +/** Returns the value of \a arg, subject to the contraints in \a options. */ +int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options); + #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c new file mode 100644 index 0000000000..6613785dea --- /dev/null +++ b/src/core/lib/channel/message_size_filter.c @@ -0,0 +1,165 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/lib/channel/message_size_filter.h" + +#include <limits.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/channel/channel_args.h" + +// The protobuf library will (by default) start warning at 100 megs. +#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024) + +typedef struct call_data { + // Receive closures are chained: we inject this closure as the + // recv_message_ready up-call on transport_stream_op, and remember to + // call our next_recv_message_ready member after handling it. + grpc_closure recv_message_ready; + // Used by recv_message_ready. + grpc_byte_stream** recv_message; + // Original recv_message_ready callback, invoked after our own. + grpc_closure* next_recv_message_ready; +} call_data; + +typedef struct channel_data { + size_t max_send_size; + size_t max_recv_size; +} channel_data; + +// Callback invoked when we receive a message. Here we check the max +// receive message size. +static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_error* error) { + grpc_call_element* elem = user_data; + call_data* calld = elem->call_data; + channel_data* chand = elem->channel_data; + if (*calld->recv_message != NULL && + (*calld->recv_message)->length > chand->max_recv_size) { + char* message_string; + gpr_asprintf( + &message_string, "Received message larger than max (%u vs. %lu)", + (*calld->recv_message)->length, (unsigned long)chand->max_recv_size); + gpr_slice message = gpr_slice_from_copied_string(message_string); + gpr_free(message_string); + grpc_call_element_send_cancel_with_message( + exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); + } + // Invoke the next callback. + grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL); +} + +// Start transport op. +static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_transport_stream_op* op) { + call_data* calld = elem->call_data; + channel_data* chand = elem->channel_data; + // Check max send message size. + if (op->send_message != NULL && + op->send_message->length > chand->max_send_size) { + char* message_string; + gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %lu)", + op->send_message->length, (unsigned long)chand->max_send_size); + gpr_slice message = gpr_slice_from_copied_string(message_string); + gpr_free(message_string); + grpc_call_element_send_cancel_with_message( + exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); + } + // Inject callback for receiving a message. + if (op->recv_message_ready != NULL) { + calld->next_recv_message_ready = op->recv_message_ready; + calld->recv_message = op->recv_message; + op->recv_message_ready = &calld->recv_message_ready; + } + // Chain to the next filter. + grpc_call_next_op(exec_ctx, elem, op); +} + +// Constructor for call_data. +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_call_element_args* args) { + call_data* calld = elem->call_data; + calld->next_recv_message_ready = NULL; + grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem); + return GRPC_ERROR_NONE; +} + +// Destructor for call_data. +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + const grpc_call_final_info* final_info, + void* ignored) {} + +// Constructor for channel_data. +static void init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + GPR_ASSERT(!args->is_last); + channel_data* chand = elem->channel_data; + memset(chand, 0, sizeof(*chand)); + chand->max_send_size = DEFAULT_MAX_MESSAGE_LENGTH; + chand->max_recv_size = DEFAULT_MAX_MESSAGE_LENGTH; + const grpc_integer_options options = {DEFAULT_MAX_MESSAGE_LENGTH, 0, INT_MAX}; + for (size_t i = 0; i < args->channel_args->num_args; ++i) { + if (strcmp(args->channel_args->args[i].key, + GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { + chand->max_send_size = (size_t)grpc_channel_arg_get_integer( + &args->channel_args->args[i], options); + } + if (strcmp(args->channel_args->args[i].key, + GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { + chand->max_recv_size = (size_t)grpc_channel_arg_get_integer( + &args->channel_args->args[i], options); + } + } +} + +// Destructor for channel_data. +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} + +const grpc_channel_filter grpc_message_size_filter = { + start_transport_stream_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "message_size"}; diff --git a/src/core/lib/channel/message_size_filter.h b/src/core/lib/channel/message_size_filter.h new file mode 100644 index 0000000000..a88ff7f81a --- /dev/null +++ b/src/core/lib/channel/message_size_filter.h @@ -0,0 +1,39 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H +#define GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_message_size_filter; + +#endif /* GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H */ diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 119f5e82ab..979e0aeaea 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1163,17 +1163,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); } - } else if (call->receiving_stream->length > - grpc_channel_get_max_message_length(call->channel)) { - cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, - "Max message size exceeded"); - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); - call->receiving_stream = NULL; - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } } else { call->test_only_last_message_flags = call->receiving_stream->flags; if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 52e78567bd..6adb70a987 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -64,7 +64,6 @@ typedef struct registered_call { struct grpc_channel { int is_client; - uint32_t max_message_length; grpc_compression_options compression_options; grpc_mdelem *default_authority; @@ -80,9 +79,6 @@ struct grpc_channel { #define CHANNEL_FROM_TOP_ELEM(top_elem) \ CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) -/* the protobuf library will (by default) start warning at 100megs */ -#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024) - static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); @@ -114,21 +110,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; - channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; grpc_compression_options_init(&channel->compression_options); if (args) { for (size_t i = 0; i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { - if (args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s ignored: it must be an integer", - GRPC_ARG_MAX_MESSAGE_LENGTH); - } else if (args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", - GRPC_ARG_MAX_MESSAGE_LENGTH); - } else { - channel->max_message_length = (uint32_t)args->args[i].value.integer; - } - } else if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { if (args->args[i].type != GRPC_ARG_STRING) { gpr_log(GPR_ERROR, "%s ignored: it must be a string", GRPC_ARG_DEFAULT_AUTHORITY); @@ -370,7 +355,3 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { return grpc_mdelem_from_metadata_strings(GRPC_MDSTR_GRPC_STATUS, grpc_mdstr_from_string(tmp)); } - -uint32_t grpc_channel_get_max_message_length(grpc_channel *channel) { - return channel->max_message_length; -} diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 4c62974346..23cc8656ca 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -64,7 +64,6 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); The returned elem is owned by the caller. */ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); -uint32_t grpc_channel_get_max_message_length(grpc_channel *channel); #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index edda0c85fa..64bdfc3446 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -45,6 +45,7 @@ #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" +#include "src/core/lib/channel/message_size_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/combiner.h" @@ -99,6 +100,15 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder, static void register_builtin_channel_init() { grpc_channel_init_register_stage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + prepend_filter, (void *)&grpc_message_size_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + prepend_filter, (void *)&grpc_message_size_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, + (void *)&grpc_message_size_filter); + grpc_channel_init_register_stage( GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, (void *)&grpc_compress_filter); grpc_channel_init_register_stage( |