diff options
Diffstat (limited to 'src')
89 files changed, 1213 insertions, 570 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 54ee75af28..591135cd6f 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -132,7 +132,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( for (i = 0; i < a->num_args; ++i) { if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) { - return a->args[i].value.integer; + return (grpc_compression_algorithm)a->args[i].value.integer; break; } } @@ -177,9 +177,9 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( if (states_arg_found) { if (state != 0) { - GPR_BITSET(states_arg, algorithm); + GPR_BITSET((unsigned *)states_arg, algorithm); } else { - GPR_BITCLEAR(states_arg, algorithm); + GPR_BITCLEAR((unsigned *)states_arg, algorithm); } } else { /* create a new arg */ @@ -189,9 +189,9 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( /* all enabled by default */ tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; if (state != 0) { - GPR_BITSET(&tmp.value.integer, algorithm); + GPR_BITSET((unsigned *)&tmp.value.integer, algorithm); } else { - GPR_BITCLEAR(&tmp.value.integer, algorithm); + GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm); } result = grpc_channel_args_copy_and_add(*a, &tmp, 1); grpc_channel_args_destroy(*a); diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index cd7c182ef2..4eb5df5de3 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -57,7 +57,7 @@ int grpc_trace_channel = 0; /* Given a size, round up to the next multiple of sizeof(void*) */ #define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ - (((x) + GPR_MAX_ALIGNMENT - 1) & ~(GPR_MAX_ALIGNMENT - 1)) + (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count) { diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 762a4edc73..7959603102 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -48,7 +48,8 @@ typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; - int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */ + gpr_uint32 + remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */ int written_initial_metadata; /**< Already processed initial md? */ /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ @@ -70,6 +71,8 @@ typedef struct channel_data { grpc_mdelem *mdelem_accept_encoding; /** The default, channel-level, compression algorithm */ grpc_compression_algorithm default_compression_algorithm; + /** Compression options for the channel */ + grpc_compression_options compression_options; } channel_data; /** Compress \a slices in place using \a algorithm. Returns 1 if compression did @@ -102,7 +105,17 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), &calld->compression_algorithm)) { - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (unknown). Ignoring.", + md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, calld->compression_algorithm) == + 0) { + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (previously disabled). " + "Ignoring.", md_c_str); calld->compression_algorithm = GRPC_COMPRESS_NONE; } @@ -141,8 +154,9 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, grpc_stream_op *sop = &send_ops->ops[i]; switch (sop->type) { case GRPC_OP_BEGIN_MESSAGE: + GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX); grpc_sopb_add_begin_message( - &new_send_ops, calld->slices.length, + &new_send_ops, (gpr_uint32)calld->slices.length, sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); break; case GRPC_OP_SLICE: @@ -228,7 +242,10 @@ static void process_send_ops(grpc_call_element *elem, GPR_ASSERT(calld->remaining_slice_bytes > 0); /* Increase input ref count, gpr_slice_buffer_add takes ownership. */ gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice)); - calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice); + GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) >= + calld->remaining_slice_bytes); + calld->remaining_slice_bytes -= + (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice); if (calld->remaining_slice_bytes == 0) { did_compress = compress_send_sb(calld->compression_algorithm, &calld->slices); @@ -294,11 +311,21 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; + size_t supported_algorithms_idx = 0; char *accept_encoding_str; size_t accept_encoding_str_len; + grpc_compression_options_init(&channeld->compression_options); + channeld->compression_options.enabled_algorithms_bitset = + (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args); + channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args); + /* Make sure the default isn't disabled. */ + GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, channeld->default_compression_algorithm)); + channeld->compression_options.default_compression_algorithm = + channeld->default_compression_algorithm; channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); @@ -311,6 +338,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { char *algorithm_name; + /* skip disabled algorithms */ + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, algo_idx) == 0) { + continue; + } GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( @@ -318,15 +350,15 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorithm_name, 0)); if (algo_idx > 0) { - supported_algorithms_names[algo_idx - 1] = algorithm_name; + supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; } } /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated * arrays, as to avoid the heap allocs */ - accept_encoding_str = gpr_strjoin_sep( - supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names), - ", ", &accept_encoding_str_len); + accept_encoding_str = + gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",", + &accept_encoding_str_len); channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index a8cd5fc149..c1e583e4a5 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -47,3 +47,7 @@ void grpc_connector_connect(grpc_connector *connector, grpc_iomgr_closure *notify) { connector->vtable->connect(connector, in_args, out_args, notify); } + +void grpc_connector_shutdown(grpc_connector *connector) { + connector->vtable->shutdown(connector); +} diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index edcb10a36e..01aa716412 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -50,7 +50,7 @@ typedef struct { grpc_pollset_set *interested_parties; /** address to connect to */ const struct sockaddr *addr; - int addr_len; + size_t addr_len; /** deadline for connection */ gpr_timespec deadline; /** channel arguments (to be passed to transport) */ @@ -70,6 +70,9 @@ typedef struct { struct grpc_connector_vtable { void (*ref)(grpc_connector *connector); void (*unref)(grpc_connector *connector); + /** Implementation of grpc_connector_shutdown */ + void (*shutdown)(grpc_connector *connector); + /** Implementation of grpc_connector_connect */ void (*connect)(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, grpc_iomgr_closure *notify); @@ -77,9 +80,12 @@ struct grpc_connector_vtable { void grpc_connector_ref(grpc_connector *connector); void grpc_connector_unref(grpc_connector *connector); +/** Connect using the connector: max one outstanding call at a time */ void grpc_connector_connect(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, grpc_iomgr_closure *notify); +/** Cancel any pending connection */ +void grpc_connector_shutdown(grpc_connector *connector); #endif diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 5ae2e0ea52..c8262e92ef 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -31,6 +31,7 @@ * */ +#include "src/core/client_config/lb_policy_factory.h" #include "src/core/client_config/lb_policies/pick_first.h" #include <string.h> @@ -314,19 +315,34 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_check_connectivity, pf_notify_on_state_change}; -grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, - size_t num_subchannels) { +static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} + +static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} + +static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(num_subchannels); + GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); - p->num_subchannels = num_subchannels; + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + p->num_subchannels = args->num_subchannels; grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); - memcpy(p->subchannels, subchannels, - sizeof(grpc_subchannel *) * num_subchannels); + memcpy(p->subchannels, args->subchannels, + sizeof(grpc_subchannel *) * args->num_subchannels); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); gpr_mu_init(&p->mu); return &p->base; } + +static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { + pick_first_factory_ref, pick_first_factory_unref, create_pick_first, + "pick_first"}; + +static grpc_lb_policy_factory pick_first_lb_policy_factory = { + &pick_first_factory_vtable}; + +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() { + return &pick_first_lb_policy_factory; +} diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h index 31394985e5..3ca53ad42a 100644 --- a/src/core/client_config/lb_policies/pick_first.h +++ b/src/core/client_config/lb_policies/pick_first.h @@ -34,11 +34,10 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_PICK_FIRST_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_PICK_FIRST_H -#include "src/core/client_config/lb_policy.h" +#include "src/core/client_config/lb_policy_factory.h" -/** Returns a load balancing policy instance that picks up the first subchannel - * from \a subchannels to succesfully connect */ -grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, - size_t num_subchannels); +/** Returns a load balancing factory for the pick first policy, which picks up + * the first subchannel from \a subchannels to succesfully connect */ +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create(); #endif diff --git a/src/core/client_config/lb_policy_factory.c b/src/core/client_config/lb_policy_factory.c new file mode 100644 index 0000000000..0c097e0542 --- /dev/null +++ b/src/core/client_config/lb_policy_factory.c @@ -0,0 +1,47 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policy_factory.h" + +void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory) { + factory->vtable->ref(factory); +} +void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory) { + factory->vtable->unref(factory); +} + +grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + if (factory == NULL) return NULL; + return factory->vtable->create_lb_policy(factory, args); +} diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h new file mode 100644 index 0000000000..04610316ee --- /dev/null +++ b/src/core/client_config/lb_policy_factory.h @@ -0,0 +1,73 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_FACTORY_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_FACTORY_H + +#include "src/core/client_config/lb_policy.h" +#include "src/core/client_config/subchannel.h" + +typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; +typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; + +/** grpc_lb_policy provides grpc_client_config objects to grpc_channel + objects */ +struct grpc_lb_policy_factory { + const grpc_lb_policy_factory_vtable *vtable; +}; + +typedef struct grpc_lb_policy_args { + grpc_subchannel **subchannels; + size_t num_subchannels; +} grpc_lb_policy_args; + +struct grpc_lb_policy_factory_vtable { + void (*ref)(grpc_lb_policy_factory *factory); + void (*unref)(grpc_lb_policy_factory *factory); + + /** Implementation of grpc_lb_policy_factory_create_lb_policy */ + grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args); + + /** Name for the LB policy this factory implements */ + const char *name; +}; + +void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory); +void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory); + +/** Create a lb_policy instance. */ +grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); + +#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_FACTORY_H */ diff --git a/src/core/client_config/lb_policy_registry.c b/src/core/client_config/lb_policy_registry.c new file mode 100644 index 0000000000..ae4a077ef3 --- /dev/null +++ b/src/core/client_config/lb_policy_registry.c @@ -0,0 +1,88 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policy_registry.h" + +#include <string.h> + +#define MAX_POLICIES 10 + +static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES]; +static int g_number_of_lb_policies = 0; + +static grpc_lb_policy_factory *g_default_lb_policy_factory; + +void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory) { + g_number_of_lb_policies = 0; + g_default_lb_policy_factory = default_factory; +} + +void grpc_lb_policy_registry_shutdown(void) { + int i; + for (i = 0; i < g_number_of_lb_policies; i++) { + grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]); + } +} + +void grpc_register_lb_policy(grpc_lb_policy_factory *factory) { + int i; + for (i = 0; i < g_number_of_lb_policies; i++) { + GPR_ASSERT(0 != strcmp(factory->vtable->name, + g_all_of_the_lb_policies[i]->vtable->name)); + } + GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES); + grpc_lb_policy_factory_ref(factory); + g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory; +} + +static grpc_lb_policy_factory *lookup_factory(const char* name) { + int i; + + if (name == NULL) return NULL; + + for (i = 0; i < g_number_of_lb_policies; i++) { + if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) { + return g_all_of_the_lb_policies[i]; + } + } + + return NULL; +} + +grpc_lb_policy *grpc_lb_policy_create(const char *name, + grpc_lb_policy_args *args) { + grpc_lb_policy_factory *factory = lookup_factory(name); + grpc_lb_policy *lb_policy = grpc_lb_policy_factory_create_lb_policy( + factory, args); + return lb_policy; +} diff --git a/src/core/client_config/lb_policy_registry.h b/src/core/client_config/lb_policy_registry.h new file mode 100644 index 0000000000..96fc2a1628 --- /dev/null +++ b/src/core/client_config/lb_policy_registry.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H + +#include "src/core/client_config/lb_policy_factory.h" + +/** Initialize the registry and set \a default_factory as the factory to be + * returned when no name is provided in a lookup */ +void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory); +void grpc_lb_policy_registry_shutdown(void); + +/** Register a LB policy factory. */ +void grpc_register_lb_policy(grpc_lb_policy_factory *factory); + +/** Create a \a grpc_lb_policy instance. + * + * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init + * will be returned. */ +grpc_lb_policy *grpc_lb_policy_create(const char *name, + grpc_lb_policy_args *args); + +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 84643c464a..ccec07a08c 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -39,7 +39,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/client_config/subchannel_factory_decorators/add_channel_arg.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -55,9 +55,8 @@ typedef struct { char *default_port; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -135,16 +134,19 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { grpc_lb_policy *lb_policy; size_t i; if (addresses) { + grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); for (i = 0; i < addresses->naddrs; i++) { memset(&args, 0, sizeof(args)); args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = addresses->addrs[i].len; + args.addr_len = (size_t)addresses->addrs[i].len; subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); @@ -193,13 +195,13 @@ static void dns_destroy(grpc_resolver *gr) { grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->name); gpr_free(r->default_port); + gpr_free(r->lb_policy_name); gpr_free(r); } static grpc_resolver *dns_create( grpc_uri *uri, const char *default_port, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + const char* lb_policy_name, grpc_subchannel_factory *subchannel_factory) { dns_resolver *r; const char *path = uri->path; @@ -220,7 +222,7 @@ static grpc_resolver *dns_create( r->default_port = gpr_strdup(default_port); r->subchannel_factory = subchannel_factory; grpc_subchannel_factory_ref(subchannel_factory); - r->lb_policy_factory = lb_policy_factory; + r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } @@ -235,8 +237,7 @@ static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - return dns_create(uri, "https", grpc_create_pick_first_lb_policy, - subchannel_factory); + return dns_create(uri, "https", "pick_first", subchannel_factory); } char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 0d8540a566..2900df285c 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -45,7 +45,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -56,14 +56,13 @@ typedef struct { gpr_refcount refs; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** the addresses that we've 'resolved' */ struct sockaddr_storage *addrs; /** the corresponding length of the addresses */ - int *addrs_len; + size_t *addrs_len; /** how many elements in \a addrs */ size_t num_addrs; @@ -122,6 +121,7 @@ static void sockaddr_next(grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; + grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; @@ -136,7 +136,10 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, r->num_addrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = r->num_addrs; + lb_policy = + grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); gpr_free(subchannels); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "unix"); @@ -153,11 +156,13 @@ static void sockaddr_destroy(grpc_resolver *gr) { grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->addrs); gpr_free(r->addrs_len); + gpr_free(r->lb_policy_name); gpr_free(r); } #ifdef GPR_POSIX_SOCKET -static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { +static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { struct sockaddr_un *un = (struct sockaddr_un *)addr; un->sun_family = AF_UNIX; @@ -189,7 +194,8 @@ static char *ipv6_get_default_authority(grpc_resolver_factory *factory, return ip_get_default_authority(uri); } -static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { +static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { const char *host_port = uri->path; char *host; char *port; @@ -216,7 +222,7 @@ static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); goto done; } - in->sin_port = htons(port_num); + in->sin_port = htons((gpr_uint16)port_num); } else { gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); goto done; @@ -229,7 +235,8 @@ done: return result; } -static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { +static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { const char *host_port = uri->path; char *host; char *port; @@ -256,7 +263,7 @@ static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); goto done; } - in6->sin6_port = htons(port_num); + in6->sin6_port = htons((gpr_uint16)port_num); } else { gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); goto done; @@ -271,11 +278,9 @@ done: static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( - grpc_uri *uri, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + grpc_uri *uri, const char *lb_policy_name, grpc_subchannel_factory *subchannel_factory, - int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) { + int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { size_t i; int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; @@ -296,7 +301,7 @@ static grpc_resolver *sockaddr_create( gpr_slice_split(path_slice, ",", &path_parts); r->num_addrs = path_parts.count; r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); - r->addrs_len = gpr_malloc(sizeof(int) * r->num_addrs); + r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); for(i = 0; i < r->num_addrs; i++) { grpc_uri ith_uri = *uri; @@ -320,7 +325,7 @@ static grpc_resolver *sockaddr_create( gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = subchannel_factory; - r->lb_policy_factory = lb_policy_factory; + r->lb_policy_name = gpr_strdup(lb_policy_name); grpc_subchannel_factory_ref(subchannel_factory); return &r->base; @@ -338,7 +343,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_uri *uri, \ grpc_subchannel_factory *subchannel_factory) { \ - return sockaddr_create(uri, grpc_create_pick_first_lb_policy, \ + return sockaddr_create(uri, "pick_first", \ subchannel_factory, parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index da399f9954..2594e6fae9 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -41,7 +41,7 @@ #include <grpc/grpc_zookeeper.h> #include <zookeeper/zookeeper.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -59,9 +59,8 @@ typedef struct { char *name; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -183,6 +182,7 @@ static void zookeeper_on_resolved(void *arg, grpc_lb_policy *lb_policy; size_t i; if (addresses != NULL) { + grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); for (i = 0; i < addresses->naddrs; i++) { @@ -192,7 +192,10 @@ static void zookeeper_on_resolved(void *arg, subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = + grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); @@ -244,7 +247,7 @@ static void zookeeper_dns_resolved(void *arg, } /** Parses JSON format address of a zookeeper node */ -static char *zookeeper_parse_address(const char *value, int value_len) { +static char *zookeeper_parse_address(const char *value, size_t value_len) { grpc_json *json; grpc_json *cur; const char *host; @@ -294,7 +297,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, return; } - address = zookeeper_parse_address(value, value_len); + address = zookeeper_parse_address(value, (size_t)value_len); if (address != NULL) { /** Further resolves address by DNS */ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); @@ -364,7 +367,7 @@ static void zookeeper_get_node_completion(int rc, const char *value, /** If zookeeper node of path r->name does not have address (i.e. service node), get its children */ - address = zookeeper_parse_address(value, value_len); + address = zookeeper_parse_address(value, (size_t)value_len); if (address != NULL) { r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); r->resolved_addrs->addrs = NULL; @@ -420,13 +423,12 @@ static void zookeeper_destroy(grpc_resolver *gr) { } grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->name); + gpr_free(r->lb_policy_name); gpr_free(r); } static grpc_resolver *zookeeper_create( - grpc_uri *uri, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + grpc_uri *uri, const char *lb_policy_name, grpc_subchannel_factory *subchannel_factory) { zookeeper_resolver *r; size_t length; @@ -451,7 +453,7 @@ static grpc_resolver *zookeeper_create( r->name = gpr_strdup(path); r->subchannel_factory = subchannel_factory; - r->lb_policy_factory = lb_policy_factory; + r->lb_policy_name = gpr_strdup(lb_policy_name); grpc_subchannel_factory_ref(subchannel_factory); /** Initializes zookeeper client */ @@ -490,8 +492,7 @@ static char *zookeeper_factory_get_default_hostname( static grpc_resolver *zookeeper_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - return zookeeper_create(uri, grpc_create_pick_first_lb_policy, - subchannel_factory); + return zookeeper_create(uri, "pick_first", subchannel_factory); } static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index ca52c75beb..876d2aa418 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -439,6 +439,10 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, if (cancel_alarm) { grpc_alarm_cancel(&c->alarm); } + + if (op->disconnect) { + grpc_connector_shutdown(c->connector); + } } static void on_state_changed(void *p, int iomgr_success) { diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index 410a61c8cf..2738e2df57 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -39,10 +39,13 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, +/** a size_t default value... maps to all 1's */ +#define NOT_SET (~(size_t)0) + +static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section, int suppress_errors) { char *line_prefix; - int pfx_len; + size_t pfx_len; if (!suppress_errors) { gpr_asprintf(&line_prefix, "bad uri.%s: '", section); @@ -60,22 +63,90 @@ static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, return NULL; } -static char *copy_fragment(const char *src, int begin, int end) { +/** Returns a copy of \a src[begin, end) */ +static char *copy_component(const char *src, size_t begin, size_t end) { char *out = gpr_malloc(end - begin + 1); memcpy(out, src + begin, end - begin); out[end - begin] = 0; return out; } +/** Returns how many chars to advance if \a uri_text[i] begins a valid \a pchar + * production. If \a uri_text[i] introduces an invalid \a pchar (such as percent + * sign not followed by two hex digits), NOT_SET is returned. */ +static size_t parse_pchar(const char *uri_text, size_t i) { + /* pchar = unreserved / pct-encoded / sub-delims / ":" / "@" + * unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" + * pct-encoded = "%" HEXDIG HEXDIG + * sub-delims = "!" / "$" / "&" / "'" / "(" / ")" + / "*" / "+" / "," / ";" / "=" */ + char c = uri_text[i]; + if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) || + ((c >= '0') && (c <= '9')) || + (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */ + + (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' || + c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' || + c == '=') /* sub-delims */) { + return 1; + } + if (c == '%') { /* pct-encoded */ + size_t j; + if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) { + return NOT_SET; + } + for (j = i + 1; j < 2; j++) { + c = uri_text[j]; + if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) || + ((c >= 'A') && (c <= 'F')))) { + return NOT_SET; + } + } + return 2; + } + return 0; +} + +/* *( pchar / "?" / "/" ) */ +static int parse_fragment_or_query(const char *uri_text, size_t *i) { + char c; + while ((c = uri_text[*i]) != 0) { + const size_t advance = parse_pchar(uri_text, *i); /* pchar */ + switch (advance) { + case 0: /* uri_text[i] isn't in pchar */ + /* maybe it's ? or / */ + if (uri_text[*i] == '?' || uri_text[*i] == '/') { + (*i)++; + break; + } else { + return 1; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + default: + (*i) += advance; + break; + case NOT_SET: /* uri_text[i] introduces an invalid URI */ + return 0; + } + } + /* *i is the first uri_text position past the \a query production, maybe \0 */ + return 1; +} + grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { grpc_uri *uri; - int scheme_begin = 0; - int scheme_end = -1; - int authority_begin = -1; - int authority_end = -1; - int path_begin = -1; - int path_end = -1; - int i; + size_t scheme_begin = 0; + size_t scheme_end = NOT_SET; + size_t authority_begin = NOT_SET; + size_t authority_end = NOT_SET; + size_t path_begin = NOT_SET; + size_t path_end = NOT_SET; + size_t query_begin = NOT_SET; + size_t query_end = NOT_SET; + size_t fragment_begin = NOT_SET; + size_t fragment_end = NOT_SET; + size_t i; for (i = scheme_begin; uri_text[i] != 0; i++) { if (uri_text[i] == ':') { @@ -92,27 +163,22 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { } break; } - if (scheme_end == -1) { + if (scheme_end == NOT_SET) { return bad_uri(uri_text, i, "scheme", suppress_errors); } if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') { authority_begin = scheme_end + 3; - for (i = authority_begin; uri_text[i] != 0 && authority_end == -1; i++) { - if (uri_text[i] == '/') { + for (i = authority_begin; uri_text[i] != 0 && authority_end == NOT_SET; + i++) { + if (uri_text[i] == '/' || uri_text[i] == '?' || uri_text[i] == '#') { authority_end = i; } - if (uri_text[i] == '?') { - return bad_uri(uri_text, i, "query_not_supported", suppress_errors); - } - if (uri_text[i] == '#') { - return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors); - } } - if (authority_end == -1 && uri_text[i] == 0) { + if (authority_end == NOT_SET && uri_text[i] == 0) { authority_end = i; } - if (authority_end == -1) { + if (authority_end == NOT_SET) { return bad_uri(uri_text, i, "authority", suppress_errors); } /* TODO(ctiller): parse the authority correctly */ @@ -122,20 +188,46 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { } for (i = path_begin; uri_text[i] != 0; i++) { - if (uri_text[i] == '?') { - return bad_uri(uri_text, i, "query_not_supported", suppress_errors); + if (uri_text[i] == '?' || uri_text[i] == '#') { + path_end = i; + break; } - if (uri_text[i] == '#') { - return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors); + } + if (path_end == NOT_SET && uri_text[i] == 0) { + path_end = i; + } + if (path_end == NOT_SET) { + return bad_uri(uri_text, i, "path", suppress_errors); + } + + if (uri_text[i] == '?') { + query_begin = ++i; + if (!parse_fragment_or_query(uri_text, &i)) { + return bad_uri(uri_text, i, "query", suppress_errors); + } else if (uri_text[i] != 0 && uri_text[i] != '#') { + /* We must be at the end or at the beginning of a fragment */ + return bad_uri(uri_text, i, "query", suppress_errors); + } + query_end = i; + } + if (uri_text[i] == '#') { + fragment_begin = ++i; + if (!parse_fragment_or_query(uri_text, &i)) { + return bad_uri(uri_text, i - fragment_end, "fragment", suppress_errors); + } else if (uri_text[i] != 0) { + /* We must be at the end */ + return bad_uri(uri_text, i, "fragment", suppress_errors); } + fragment_end = i; } - path_end = i; uri = gpr_malloc(sizeof(*uri)); memset(uri, 0, sizeof(*uri)); - uri->scheme = copy_fragment(uri_text, scheme_begin, scheme_end); - uri->authority = copy_fragment(uri_text, authority_begin, authority_end); - uri->path = copy_fragment(uri_text, path_begin, path_end); + uri->scheme = copy_component(uri_text, scheme_begin, scheme_end); + uri->authority = copy_component(uri_text, authority_begin, authority_end); + uri->path = copy_component(uri_text, path_begin, path_end); + uri->query = copy_component(uri_text, query_begin, query_end); + uri->fragment = copy_component(uri_text, fragment_begin, fragment_end); return uri; } @@ -145,5 +237,7 @@ void grpc_uri_destroy(grpc_uri *uri) { gpr_free(uri->scheme); gpr_free(uri->authority); gpr_free(uri->path); + gpr_free(uri->query); + gpr_free(uri->fragment); gpr_free(uri); } diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h index ce4e6aecb0..b8daa13bd4 100644 --- a/src/core/client_config/uri_parser.h +++ b/src/core/client_config/uri_parser.h @@ -38,6 +38,8 @@ typedef struct { char *scheme; char *authority; char *path; + char *query; + char *fragment; } grpc_uri; /** parse a uri, return NULL on failure */ diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index 6ed6dbe93f..76d42fde0f 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -33,7 +33,9 @@ #include <stdlib.h> #include <string.h> + #include <grpc/compression.h> +#include <grpc/support/useful.h> int grpc_compression_algorithm_parse(const char *name, size_t name_length, grpc_compression_algorithm *algorithm) { @@ -102,3 +104,24 @@ grpc_compression_level grpc_compression_level_for_algorithm( } abort(); } + +void grpc_compression_options_init(grpc_compression_options *opts) { + opts->enabled_algorithms_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT)-1; + opts->default_compression_algorithm = GRPC_COMPRESS_NONE; +} + +void grpc_compression_options_enable_algorithm( + grpc_compression_options *opts, grpc_compression_algorithm algorithm) { + GPR_BITSET(&opts->enabled_algorithms_bitset, algorithm); +} + +void grpc_compression_options_disable_algorithm( + grpc_compression_options *opts, grpc_compression_algorithm algorithm) { + GPR_BITCLEAR(&opts->enabled_algorithms_bitset, algorithm); +} + +int grpc_compression_options_is_algorithm_enabled( + const grpc_compression_options *opts, + grpc_compression_algorithm algorithm) { + return GPR_BITGET(opts->enabled_algorithms_bitset, algorithm); +} diff --git a/src/core/compression/message_compress.c b/src/core/compression/message_compress.c index 7856f40dd1..01db7134c3 100644 --- a/src/core/compression/message_compress.c +++ b/src/core/compression/message_compress.c @@ -49,19 +49,23 @@ static int zlib_body(z_stream *zs, gpr_slice_buffer *input, int flush; size_t i; gpr_slice outbuf = gpr_slice_malloc(OUTPUT_BLOCK_SIZE); + const uInt uint_max = ~(uInt)0; - zs->avail_out = GPR_SLICE_LENGTH(outbuf); + GPR_ASSERT(GPR_SLICE_LENGTH(outbuf) <= uint_max); + zs->avail_out = (uInt)GPR_SLICE_LENGTH(outbuf); zs->next_out = GPR_SLICE_START_PTR(outbuf); flush = Z_NO_FLUSH; for (i = 0; i < input->count; i++) { if (i == input->count - 1) flush = Z_FINISH; - zs->avail_in = GPR_SLICE_LENGTH(input->slices[i]); + GPR_ASSERT(GPR_SLICE_LENGTH(input->slices[i]) <= uint_max); + zs->avail_in = (uInt)GPR_SLICE_LENGTH(input->slices[i]); zs->next_in = GPR_SLICE_START_PTR(input->slices[i]); do { if (zs->avail_out == 0) { gpr_slice_buffer_add_indexed(output, outbuf); outbuf = gpr_slice_malloc(OUTPUT_BLOCK_SIZE); - zs->avail_out = GPR_SLICE_LENGTH(outbuf); + GPR_ASSERT(GPR_SLICE_LENGTH(outbuf) <= uint_max); + zs->avail_out = (uInt)GPR_SLICE_LENGTH(outbuf); zs->next_out = GPR_SLICE_START_PTR(outbuf); } r = flate(zs, flush); diff --git a/src/core/debug/trace.c b/src/core/debug/trace.c index 1014b1f4db..3b35d81cd8 100644 --- a/src/core/debug/trace.c +++ b/src/core/debug/trace.c @@ -59,9 +59,13 @@ void grpc_register_tracer(const char *name, int *flag) { static void add(const char *beg, const char *end, char ***ss, size_t *ns) { size_t n = *ns; size_t np = n + 1; - char *s = gpr_malloc(end - beg + 1); - memcpy(s, beg, end - beg); - s[end - beg] = 0; + char *s; + size_t len; + GPR_ASSERT(end >= beg); + len = (size_t)(end - beg); + s = gpr_malloc(len + 1); + memcpy(s, beg, len); + s[len] = 0; *ss = gpr_realloc(*ss, sizeof(char **) * np); (*ss)[n] = s; *ns = np; diff --git a/src/core/httpcli/parser.c b/src/core/httpcli/parser.c index 7b2a62060c..404906d5ae 100644 --- a/src/core/httpcli/parser.c +++ b/src/core/httpcli/parser.c @@ -96,13 +96,15 @@ static int add_header(grpc_httpcli_parser *parser) { gpr_log(GPR_ERROR, "Didn't find ':' in header string"); goto error; } - hdr.key = buf2str(beg, cur - beg); + GPR_ASSERT(cur >= beg); + hdr.key = buf2str(beg, (size_t)(cur - beg)); cur++; /* skip : */ while (cur != end && (*cur == ' ' || *cur == '\t')) { cur++; } - hdr.value = buf2str(cur, end - cur - 2); + GPR_ASSERT(end - cur >= 2); + hdr.value = buf2str(cur, (size_t)(end - cur) - 2); if (parser->r.hdr_count == parser->hdr_capacity) { parser->hdr_capacity = @@ -171,7 +173,7 @@ static int addbyte(grpc_httpcli_parser *parser, gpr_uint8 byte) { parser->r.body = gpr_realloc((void *)parser->r.body, parser->body_capacity); } - ((char *)parser->r.body)[parser->r.body_length] = byte; + parser->r.body[parser->r.body_length] = (char)byte; parser->r.body_length++; return 1; } diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index ddb30dc4bb..7b67fe3b1d 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -83,7 +83,7 @@ static gpr_timespec compute_min_deadline(shard_type *shard) { } void grpc_alarm_list_init(gpr_timespec now) { - int i; + gpr_uint32 i; gpr_mu_init(&g_mu); gpr_mu_init(&g_checker_mu); @@ -123,13 +123,13 @@ static size_t shard_idx(const grpc_alarm *info) { } static double ts_to_dbl(gpr_timespec ts) { - return ts.tv_sec + 1e-9 * ts.tv_nsec; + return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; } static gpr_timespec dbl_to_ts(double d) { gpr_timespec ts; - ts.tv_sec = d; - ts.tv_nsec = 1e9 * (d - ts.tv_sec); + ts.tv_sec = (time_t)d; + ts.tv_nsec = (int)(1e9 * (d - (double)ts.tv_sec)); ts.clock_type = GPR_TIMESPAN; return ts; } @@ -145,7 +145,7 @@ static void list_remove(grpc_alarm *alarm) { alarm->prev->next = alarm->next; } -static void swap_adjacent_shards_in_queue(size_t first_shard_queue_index) { +static void swap_adjacent_shards_in_queue(gpr_uint32 first_shard_queue_index) { shard_type *temp; temp = g_shard_queue[first_shard_queue_index]; g_shard_queue[first_shard_queue_index] = @@ -355,7 +355,7 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, gpr_mu_lock(drop_mu); } - return n; + return (int)n; } int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { diff --git a/src/core/iomgr/alarm_heap.c b/src/core/iomgr/alarm_heap.c index daed251982..769142e425 100644 --- a/src/core/iomgr/alarm_heap.c +++ b/src/core/iomgr/alarm_heap.c @@ -43,9 +43,9 @@ position. This functor is called each time immediately after modifying a value in the underlying container, with the offset of the modified element as its argument. */ -static void adjust_upwards(grpc_alarm **first, int i, grpc_alarm *t) { +static void adjust_upwards(grpc_alarm **first, gpr_uint32 i, grpc_alarm *t) { while (i > 0) { - int parent = (i - 1) / 2; + gpr_uint32 parent = (gpr_uint32)(((int)i - 1) / 2); if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break; first[i] = first[parent]; first[i]->heap_index = i; @@ -58,12 +58,12 @@ static void adjust_upwards(grpc_alarm **first, int i, grpc_alarm *t) { /* Adjusts a heap so as to move a hole at position i farther away from the root, until a suitable position is found for element t. Then, copies t into that position. */ -static void adjust_downwards(grpc_alarm **first, int i, int length, - grpc_alarm *t) { +static void adjust_downwards(grpc_alarm **first, gpr_uint32 i, + gpr_uint32 length, grpc_alarm *t) { for (;;) { - int left_child = 1 + 2 * i; - int right_child; - int next_i; + gpr_uint32 left_child = 1u + 2u * i; + gpr_uint32 right_child; + gpr_uint32 next_i; if (left_child >= length) break; right_child = left_child + 1; next_i = right_child < length && @@ -93,8 +93,8 @@ static void maybe_shrink(grpc_alarm_heap *heap) { } static void note_changed_priority(grpc_alarm_heap *heap, grpc_alarm *alarm) { - int i = alarm->heap_index; - int parent = (i - 1) / 2; + gpr_uint32 i = alarm->heap_index; + gpr_uint32 parent = (gpr_uint32)(((int)i - 1) / 2); if (gpr_time_cmp(heap->alarms[parent]->deadline, alarm->deadline) < 0) { adjust_upwards(heap->alarms, i, alarm); } else { @@ -122,7 +122,7 @@ int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm) { } void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm) { - int i = alarm->heap_index; + gpr_uint32 i = alarm->heap_index; if (i == heap->alarm_count - 1) { heap->alarm_count--; maybe_shrink(heap); diff --git a/src/core/iomgr/alarm_heap.h b/src/core/iomgr/alarm_heap.h index 60db6c991b..91d6ee3ca2 100644 --- a/src/core/iomgr/alarm_heap.h +++ b/src/core/iomgr/alarm_heap.h @@ -38,8 +38,8 @@ typedef struct { grpc_alarm **alarms; - int alarm_count; - int alarm_capacity; + gpr_uint32 alarm_count; + gpr_uint32 alarm_capacity; } grpc_alarm_heap; /* return 1 if the new alarm is the first alarm in the heap */ diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 2d08a77a70..38a543e36e 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -213,10 +213,9 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, const char *reason) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); - REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); + REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ if (!has_watchers(fd)) { - GPR_ASSERT(!fd->closed); fd->closed = 1; close(fd->fd); if (fd->on_done_closure) { diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 1dd03992ae..d6ca5d1f71 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -34,16 +34,18 @@ #include "src/core/iomgr/iomgr.h" #include <stdlib.h> +#include <string.h> -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/alarm_internal.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/support/string.h" + static gpr_mu g_mu; static gpr_cv g_rcv; static grpc_iomgr_closure *g_cbs_head = NULL; @@ -179,6 +181,8 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); + memset(&g_root_object, 0, sizeof(g_root_object)); + grpc_kick_poller(); gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future(GPR_CLOCK_REALTIME)); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 8f62ce2954..481bdc4ede 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -72,7 +72,7 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) { to this pollset whilst adding, but that should be benign. */ GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0); if (watcher.fd != NULL) { - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); ev.data.ptr = fd; err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); if (err < 0) { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 30ee6e24db..cae260cab0 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -101,7 +101,8 @@ static void multipoll_with_poll_pollset_maybe_work( gpr_timespec now, int allow_synchronous_callback) { int timeout; int r; - size_t i, j, pfd_count, fd_count; + size_t i, j, fd_count; + nfds_t pfd_count; pollset_hdr *h; /* TODO(ctiller): inline some elements to avoid an allocation */ grpc_fd_watcher *watchers; @@ -140,8 +141,8 @@ static void multipoll_with_poll_pollset_maybe_work( gpr_mu_unlock(&pollset->mu); for (i = 1; i < pfd_count; i++) { - pfds[i].events = grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN, - POLLOUT, &watchers[i]); + pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN, + POLLOUT, &watchers[i]); } r = grpc_poll_function(pfds, pfd_count, timeout); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 6bd1b61f24..f3e424e83c 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -187,6 +187,12 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, if (pollset->shutting_down) { goto done; } + if (pollset->in_flight_cbs) { + /* Give do_promote priority so we don't starve it out */ + gpr_mu_unlock(&pollset->mu); + gpr_mu_lock(&pollset->mu); + goto done; + } if (!pollset->kicked_without_pollers) { push_front_worker(pollset, worker); added_worker = 1; @@ -420,14 +426,8 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, grpc_fd_watcher fd_watcher; int timeout; int r; - int nfds; + nfds_t nfds; - if (pollset->in_flight_cbs) { - /* Give do_promote priority so we don't starve it out */ - gpr_mu_unlock(&pollset->mu); - gpr_mu_lock(&pollset->mu); - return; - } fd = pollset->data.ptr; if (fd && grpc_fd_is_orphaned(fd)) { GRPC_FD_UNREF(fd, "basicpoll"); @@ -443,7 +443,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, pfd[1].revents = 0; gpr_mu_unlock(&pollset->mu); pfd[1].events = - grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); + (short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); if (pfd[1].events != 0) { nfds++; } diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h index cc1bd428b0..9f361cb892 100644 --- a/src/core/iomgr/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -40,7 +40,7 @@ typedef struct { char addr[GRPC_MAX_SOCKADDR_SIZE]; - int len; + size_t len; } grpc_resolved_address; typedef struct { diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index efdc480365..0e4bf24549 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -123,15 +123,17 @@ void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, } void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out) { + GPR_ASSERT(port >= 0 && port < 65536); memset(wild_out, 0, sizeof(*wild_out)); wild_out->sin_family = AF_INET; - wild_out->sin_port = htons(port); + wild_out->sin_port = htons((gpr_uint16)port); } void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out) { + GPR_ASSERT(port >= 0 && port < 65536); memset(wild_out, 0, sizeof(*wild_out)); wild_out->sin6_family = AF_INET6; - wild_out->sin6_port = htons(port); + wild_out->sin6_port = htons((gpr_uint16)port); } int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, @@ -215,10 +217,12 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) { int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) { switch (addr->sa_family) { case AF_INET: - ((struct sockaddr_in *)addr)->sin_port = htons(port); + GPR_ASSERT(port >= 0 && port < 65536); + ((struct sockaddr_in *)addr)->sin_port = htons((gpr_uint16)port); return 1; case AF_INET6: - ((struct sockaddr_in6 *)addr)->sin6_port = htons(port); + GPR_ASSERT(port >= 0 && port < 65536); + ((struct sockaddr_in6 *)addr)->sin6_port = htons((gpr_uint16)port); return 1; default: gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index 8ad9b818e1..12296bd55b 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -46,7 +46,7 @@ in this connection being established (in order to continue their work) */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), void *arg, grpc_pollset_set *interested_parties, - const struct sockaddr *addr, int addr_len, + const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 66027f87a0..07fa44ad37 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -54,6 +54,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +extern int grpc_tcp_trace; + typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; @@ -92,6 +94,10 @@ error: static void tc_on_alarm(void *acp, int success) { int done; async_connect *ac = acp; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: success=%d", ac->addr_str, + success); + } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { grpc_fd_shutdown(ac->fd); @@ -116,6 +122,11 @@ static void on_writable(void *acp, int success) { void *cb_arg = ac->cb_arg; grpc_fd *fd; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d", + ac->addr_str, success); + } + gpr_mu_lock(&ac->mu); GPR_ASSERT(ac->fd); fd = ac->fd; @@ -195,7 +206,7 @@ finish: void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), void *arg, grpc_pollset_set *interested_parties, - const struct sockaddr *addr, int addr_len, + const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { int fd; grpc_dualstack_mode dsmode; @@ -229,7 +240,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), } do { - err = connect(fd, addr, addr_len); + GPR_ASSERT(addr_len < ~(socklen_t)0); + err = connect(fd, addr, (socklen_t)addr_len); } while (err < 0 && errno == EINTR); addr_str = grpc_sockaddr_to_uri(addr); @@ -263,6 +275,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", + ac->addr_str); + } + gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 05198dbff4..a42ec7cf11 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -77,7 +77,6 @@ static void on_alarm(void *acp, int occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); /* If the alarm didn't occur, it got cancelled. */ - gpr_log(GPR_DEBUG, "on_alarm: %p", ac->socket); if (ac->socket != NULL && occured) { grpc_winsocket_shutdown(ac->socket); } @@ -96,8 +95,6 @@ static void on_connect(void *acp, int from_iocp) { gpr_mu_lock(&ac->mu); - gpr_log(GPR_DEBUG, "on_connect: %p", ac->socket); - if (from_iocp) { DWORD transfered_bytes = 0; DWORD flags; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 0db7cd9f0e..68f469c368 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -61,14 +61,20 @@ #define SENDMSG_FLAGS 0 #endif +#ifdef GPR_MSG_IOVLEN_TYPE +typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type; +#else +typedef size_t msg_iovlen_type; +#endif + int grpc_tcp_trace = 0; typedef struct { grpc_endpoint base; grpc_fd *em_fd; int fd; - int iov_size; /* Number of slices to allocate per read attempt */ int finished_edge; + msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ size_t slice_size; gpr_refcount refcount; @@ -215,8 +221,9 @@ static void tcp_continue_read(grpc_tcp *tcp) { } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { - gpr_slice_buffer_trim_end(tcp->incoming_buffer, - tcp->incoming_buffer->length - read_bytes); + gpr_slice_buffer_trim_end( + tcp->incoming_buffer, + tcp->incoming_buffer->length - (size_t)read_bytes); } else if (tcp->iov_size < MAX_READ_IOVEC) { ++tcp->iov_size; } @@ -264,12 +271,12 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; - int iov_size; + msg_iovlen_type iov_size; ssize_t sent_length; - ssize_t sending_length; - ssize_t trailing; - ssize_t unwind_slice_idx; - ssize_t unwind_byte_idx; + size_t sending_length; + size_t trailing; + size_t unwind_slice_idx; + size_t unwind_byte_idx; for (;;) { sending_length = 0; @@ -319,9 +326,9 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { } GPR_ASSERT(tcp->outgoing_byte_idx == 0); - trailing = sending_length - sent_length; + trailing = sending_length - (size_t)sent_length; while (trailing > 0) { - ssize_t slice_length; + size_t slice_length; tcp->outgoing_slice_idx--; slice_length = GPR_SLICE_LENGTH( diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 66bb3ef701..5165f5c5ca 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -62,7 +62,7 @@ void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets, /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle all of the multiple socket port matching logic in one place */ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - int addr_len); + size_t addr_len); /* Returns the file descriptor of the Nth listening socket on this server, or -1 if the index is out of bounds. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 6399aaadb9..a3cd3bbda2 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -83,7 +83,7 @@ typedef struct { struct sockaddr sockaddr; struct sockaddr_un un; } addr; - int addr_len; + size_t addr_len; grpc_iomgr_closure read_closure; grpc_iomgr_closure destroyed_closure; } server_port; @@ -236,7 +236,7 @@ static void init_max_accept_queue_size(void) { char *end; long i = strtol(buf, &end, 10); if (i > 0 && i <= INT_MAX && end && *end == 0) { - n = i; + n = (int)i; } } fclose(fp); @@ -256,7 +256,8 @@ static int get_max_accept_queue_size(void) { } /* Prepare a recently-created socket for listening. */ -static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { +static int prepare_socket(int fd, const struct sockaddr *addr, + size_t addr_len) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; @@ -273,7 +274,8 @@ static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { goto error; } - if (bind(fd, addr, addr_len) < 0) { + GPR_ASSERT(addr_len < ~(socklen_t)0); + if (bind(fd, addr, (socklen_t)addr_len) < 0) { char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); @@ -337,6 +339,10 @@ static void on_read(void *arg, int success) { addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); + } + fdobj = grpc_fd_create(fd, name); /* TODO(ctiller): revise this when we have server-side sharding of channels -- we certainly should not be automatically adding every @@ -365,7 +371,7 @@ error: } static int add_socket_to_server(grpc_tcp_server *s, int fd, - const struct sockaddr *addr, int addr_len) { + const struct sockaddr *addr, size_t addr_len) { server_port *sp; int port; char *addr_str; @@ -398,7 +404,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, } int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - int addr_len) { + size_t addr_len) { int allocated_port1 = -1; int allocated_port2 = -1; unsigned i; diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 6429c38b28..7957066598 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -78,7 +78,7 @@ typedef struct { struct sockaddr sockaddr; struct sockaddr_un un; } addr; - int addr_len; + size_t addr_len; grpc_iomgr_closure read_closure; grpc_iomgr_closure destroyed_closure; grpc_udp_server_read_cb read_cb; @@ -94,9 +94,6 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { /* the overall server */ struct grpc_udp_server { - grpc_udp_server_cb cb; - void *cb_arg; - gpr_mu mu; gpr_cv cv; @@ -130,8 +127,6 @@ grpc_udp_server *grpc_udp_server_create(void) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->cb = NULL; - s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -221,7 +216,8 @@ void grpc_udp_server_destroy( } /* Prepare a recently-created socket for listening. */ -static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { +static int prepare_socket(int fd, const struct sockaddr *addr, + size_t addr_len) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; int get_local_ip; @@ -231,6 +227,11 @@ static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { goto error; } + if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) { + gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, + strerror(errno)); + } + get_local_ip = 1; rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, sizeof(get_local_ip)); @@ -241,7 +242,8 @@ static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { #endif } - if (bind(fd, addr, addr_len) < 0) { + GPR_ASSERT(addr_len < ~(socklen_t)0); + if (bind(fd, addr, (socklen_t)addr_len) < 0) { char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); @@ -280,14 +282,14 @@ static void on_read(void *arg, int success) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg); + sp->read_cb(sp->fd); /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); } static int add_socket_to_server(grpc_udp_server *s, int fd, - const struct sockaddr *addr, int addr_len, + const struct sockaddr *addr, size_t addr_len, grpc_udp_server_read_cb read_cb) { server_port *sp; int port; @@ -299,7 +301,6 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -319,8 +320,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, return port; } -int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len, - grpc_udp_server_read_cb read_cb) { +int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, + size_t addr_len, grpc_udp_server_read_cb read_cb) { int allocated_port1 = -1; int allocated_port2 = -1; unsigned i; @@ -405,15 +406,10 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { } void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, - size_t pollset_count, - grpc_udp_server_cb new_transport_cb, void *cb_arg) { + size_t pollset_count) { size_t i, j; - GPR_ASSERT(new_transport_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); GPR_ASSERT(s->active_ports == 0); - s->cb = new_transport_cb; - s->cb_arg = cb_arg; s->pollsets = pollsets; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { @@ -430,7 +426,7 @@ void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, /* TODO(rjshade): Add a test for this method. */ void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len, const struct sockaddr *peer_address) { - int rc; + ssize_t rc; rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address)); if (rc < 0) { gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno)); diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index fcc4ba6e97..c930e81cbc 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -39,21 +39,15 @@ /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; -/* New server callback: ep is the newly connected connection */ -typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep); - /* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(int fd, - grpc_udp_server_cb new_transport_cb, - void *cb_arg); +typedef void (*grpc_udp_server_read_cb)(int fd); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ -void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets, - size_t pollset_count, grpc_udp_server_cb cb, - void *cb_arg); +void grpc_udp_server_start(grpc_udp_server *udp_server, grpc_pollset **pollsets, + size_t pollset_count); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); @@ -67,8 +61,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); /* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle all of the multiple socket port matching logic in one place */ -int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len, - grpc_udp_server_read_cb read_cb); +int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, + size_t addr_len, grpc_udp_server_read_cb read_cb); void grpc_udp_server_destroy(grpc_udp_server *server, void (*shutdown_done)(void *shutdown_done_arg), diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c index bd643e8061..902034ee4b 100644 --- a/src/core/iomgr/wakeup_fd_pipe.c +++ b/src/core/iomgr/wakeup_fd_pipe.c @@ -56,7 +56,7 @@ static void pipe_init(grpc_wakeup_fd *fd_info) { static void pipe_consume(grpc_wakeup_fd *fd_info) { char buf[128]; - int r; + ssize_t r; for (;;) { r = read(fd_info->read_fd, buf, sizeof(buf)); diff --git a/src/core/security/base64.c b/src/core/security/base64.c index 8dfaef846f..5226d2c578 100644 --- a/src/core/security/base64.c +++ b/src/core/security/base64.c @@ -125,13 +125,14 @@ gpr_slice grpc_base64_decode(const char *b64, int url_safe) { static void decode_one_char(const unsigned char *codes, unsigned char *result, size_t *result_offset) { - gpr_uint32 packed = (codes[0] << 2) | (codes[1] >> 4); + gpr_uint32 packed = ((gpr_uint32)codes[0] << 2) | ((gpr_uint32)codes[1] >> 4); result[(*result_offset)++] = (unsigned char)packed; } static void decode_two_chars(const unsigned char *codes, unsigned char *result, size_t *result_offset) { - gpr_uint32 packed = (codes[0] << 10) | (codes[1] << 4) | (codes[2] >> 2); + gpr_uint32 packed = ((gpr_uint32)codes[0] << 10) | + ((gpr_uint32)codes[1] << 4) | ((gpr_uint32)codes[2] >> 2); result[(*result_offset)++] = (unsigned char)(packed >> 8); result[(*result_offset)++] = (unsigned char)(packed); } @@ -171,8 +172,9 @@ static int decode_group(const unsigned char *codes, size_t num_codes, decode_two_chars(codes, result, result_offset); } else { /* No padding. */ - gpr_uint32 packed = - (codes[0] << 18) | (codes[1] << 12) | (codes[2] << 6) | codes[3]; + gpr_uint32 packed = ((gpr_uint32)codes[0] << 18) | + ((gpr_uint32)codes[1] << 12) | + ((gpr_uint32)codes[2] << 6) | codes[3]; result[(*result_offset)++] = (unsigned char)(packed >> 16); result[(*result_offset)++] = (unsigned char)(packed >> 8); result[(*result_offset)++] = (unsigned char)(packed); diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c index 38ad134a6a..790f2178db 100644 --- a/src/core/security/jwt_verifier.c +++ b/src/core/security/jwt_verifier.c @@ -33,6 +33,7 @@ #include "src/core/security/jwt_verifier.h" +#include <limits.h> #include <string.h> #include "src/core/httpcli/httpcli.h" @@ -412,7 +413,9 @@ static EVP_PKEY *extract_pkey_from_x509(const char *x509_str) { X509 *x509 = NULL; EVP_PKEY *result = NULL; BIO *bio = BIO_new(BIO_s_mem()); - BIO_write(bio, x509_str, strlen(x509_str)); + size_t len = strlen(x509_str); + GPR_ASSERT(len < INT_MAX); + BIO_write(bio, x509_str, (int)len); x509 = PEM_read_bio_X509(bio, NULL, NULL, NULL); if (x509 == NULL) { gpr_log(GPR_ERROR, "Unable to parse x509 cert."); @@ -439,7 +442,8 @@ static BIGNUM *bignum_from_base64(const char *b64) { gpr_log(GPR_ERROR, "Invalid base64 for big num."); return NULL; } - result = BN_bin2bn(GPR_SLICE_START_PTR(bin), GPR_SLICE_LENGTH(bin), NULL); + result = + BN_bin2bn(GPR_SLICE_START_PTR(bin), (int)GPR_SLICE_LENGTH(bin), NULL); gpr_slice_unref(bin); return result; } @@ -769,7 +773,7 @@ void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier, GPR_ASSERT(verifier != NULL && jwt != NULL && audience != NULL && cb != NULL); dot = strchr(cur, '.'); if (dot == NULL) goto error; - json = parse_json_part_from_jwt(cur, dot - cur, &header_buffer); + json = parse_json_part_from_jwt(cur, (size_t)(dot - cur), &header_buffer); if (json == NULL) goto error; header = jose_header_from_json(json, header_buffer); if (header == NULL) goto error; @@ -777,7 +781,7 @@ void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier, cur = dot + 1; dot = strchr(cur, '.'); if (dot == NULL) goto error; - json = parse_json_part_from_jwt(cur, dot - cur, &claims_buffer); + json = parse_json_part_from_jwt(cur, (size_t)(dot - cur), &claims_buffer); if (json == NULL) goto error; claims = grpc_jwt_claims_from_json(json, claims_buffer); if (claims == NULL) goto error; diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index b767f85498..d134201e87 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -128,9 +128,11 @@ static void on_md_processing_done( calld->num_consumed_md = num_consumed_md; grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md, elem); + grpc_metadata_array_destroy(&calld->md); calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1); } else { gpr_slice message; + grpc_metadata_array_destroy(&calld->md); error_details = error_details != NULL ? error_details : "Authentication metadata processing failed."; @@ -139,7 +141,6 @@ static void on_md_processing_done( grpc_transport_stream_op_add_close(&calld->transport_op, status, &message); grpc_call_next_op(elem, &calld->transport_op); } - grpc_metadata_array_destroy(&calld->md); } static void auth_on_recv(void *user_data, int success) { diff --git a/src/core/support/cpu_posix.c b/src/core/support/cpu_posix.c index 99484e37fb..55d92c0555 100644 --- a/src/core/support/cpu_posix.c +++ b/src/core/support/cpu_posix.c @@ -44,11 +44,11 @@ static __thread char magic_thread_local; -static int ncpus = 0; +static long ncpus = 0; static void init_ncpus() { ncpus = sysconf(_SC_NPROCESSORS_ONLN); - if (ncpus < 1) { + if (ncpus < 1 || ncpus > GPR_UINT32_MAX) { gpr_log(GPR_ERROR, "Cannot determine number of CPUs: assuming 1"); ncpus = 1; } @@ -57,7 +57,7 @@ static void init_ncpus() { unsigned gpr_cpu_num_cores(void) { static gpr_once once = GPR_ONCE_INIT; gpr_once_init(&once, init_ncpus); - return ncpus; + return (unsigned)ncpus; } /* This is a cheap, but good enough, pointer hash for sharding things: */ @@ -71,7 +71,7 @@ unsigned gpr_cpu_current_cpu(void) { most code that's using this is using it to shard across work queues though, so here we use thread identity instead to achieve a similar though not identical effect */ - return shard_ptr(&magic_thread_local); + return (unsigned)shard_ptr(&magic_thread_local); } #endif /* GPR_CPU_POSIX */ diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c index 940ee20f15..8b050dbee7 100644 --- a/src/core/support/log_posix.c +++ b/src/core/support/log_posix.c @@ -62,9 +62,9 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, } else if ((size_t)ret <= sizeof(buf) - 1) { message = buf; } else { - message = allocated = gpr_malloc(ret + 1); + message = allocated = gpr_malloc((size_t)ret + 1); va_start(args, format); - vsnprintf(message, ret + 1, format, args); + vsnprintf(message, (size_t)(ret + 1), format, args); va_end(args); } gpr_log_message(file, line, severity, message); diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index 6482ef9c9f..8873d459d5 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -69,7 +69,7 @@ void gpr_slice_buffer_destroy(gpr_slice_buffer *sb) { } } -gpr_uint8 *gpr_slice_buffer_tiny_add(gpr_slice_buffer *sb, unsigned n) { +gpr_uint8 *gpr_slice_buffer_tiny_add(gpr_slice_buffer *sb, size_t n) { gpr_slice *back; gpr_uint8 *out; diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c index 27ecf62280..180ba19c68 100644 --- a/src/core/support/stack_lockfree.c +++ b/src/core/support/stack_lockfree.c @@ -79,7 +79,7 @@ struct gpr_stack_lockfree { #endif }; -gpr_stack_lockfree *gpr_stack_lockfree_create(int entries) { +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) { gpr_stack_lockfree *stack; stack = gpr_malloc(sizeof(*stack)); /* Since we only allocate 16 bits to represent an entry number, @@ -123,13 +123,13 @@ int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { #ifndef NDEBUG /* Check for double push */ { - int pushed_index = entry / (8 * sizeof(gpr_atm)); - int pushed_bit = entry % (8 * sizeof(gpr_atm)); + int pushed_index = entry / (int)(8 * sizeof(gpr_atm)); + int pushed_bit = entry % (int)(8 * sizeof(gpr_atm)); gpr_atm old_val; old_val = gpr_atm_no_barrier_fetch_add(&stack->pushed[pushed_index], (gpr_atm)(1UL << pushed_bit)); - GPR_ASSERT((old_val & (1UL << pushed_bit)) == 0); + GPR_ASSERT((old_val & (gpr_atm)(1UL << pushed_bit)) == 0); } #endif @@ -167,7 +167,7 @@ int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { old_val = gpr_atm_no_barrier_fetch_add(&stack->pushed[pushed_index], -(gpr_atm)(1UL << pushed_bit)); - GPR_ASSERT((old_val & (1UL << pushed_bit)) != 0); + GPR_ASSERT((old_val & (gpr_atm)(1UL << pushed_bit)) != 0); } #endif diff --git a/src/core/support/stack_lockfree.h b/src/core/support/stack_lockfree.h index eec960fbb0..2bbbe3bd95 100644 --- a/src/core/support/stack_lockfree.h +++ b/src/core/support/stack_lockfree.h @@ -34,11 +34,13 @@ #ifndef GRPC_INTERNAL_CORE_SUPPORT_STACK_LOCKFREE_H #define GRPC_INTERNAL_CORE_SUPPORT_STACK_LOCKFREE_H +#include <stddef.h> + typedef struct gpr_stack_lockfree gpr_stack_lockfree; /* This stack must specify the maximum number of entries to track. The current implementation only allows up to 65534 entries */ -gpr_stack_lockfree* gpr_stack_lockfree_create(int entries); +gpr_stack_lockfree* gpr_stack_lockfree_create(size_t entries); void gpr_stack_lockfree_destroy(gpr_stack_lockfree* stack); /* Pass in a valid entry number for the next stack entry */ diff --git a/src/core/support/string.c b/src/core/support/string.c index af0389ea83..e0ffeb8a4a 100644 --- a/src/core/support/string.c +++ b/src/core/support/string.c @@ -101,7 +101,7 @@ static void asciidump(dump_out *out, const char *buf, size_t len) { dump_out_append(out, '\''); } for (cur = beg; cur != end; ++cur) { - dump_out_append(out, isprint(*cur) ? *(char *)cur : '.'); + dump_out_append(out, (char)(isprint(*cur) ? *(char *)cur : '.')); } if (!out_was_empty) { dump_out_append(out, '\''); diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index a274400243..dcecff0d05 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -108,8 +108,8 @@ gpr_timespec gpr_now(gpr_clock_type clock) { break; case GPR_CLOCK_MONOTONIC: now_dbl = (mach_absolute_time() - g_time_start) * g_time_scale; - now.tv_sec = now_dbl * 1e-9; - now.tv_nsec = now_dbl - now.tv_sec * 1e9; + now.tv_sec = (time_t)(now_dbl * 1e-9); + now.tv_nsec = (int)(now_dbl - ((double)now.tv_sec) * 1e9); break; case GPR_CLOCK_PRECISE: gpr_precise_clock_now(&now); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index a8b4d65fbc..4168c2ef0c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -61,18 +61,6 @@ - status/close recv (depending on client/server) */ #define MAX_CONCURRENT_COMPLETIONS 6 -typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; - -typedef enum { - SEND_NOTHING, - SEND_INITIAL_METADATA, - SEND_BUFFERED_INITIAL_METADATA, - SEND_MESSAGE, - SEND_BUFFERED_MESSAGE, - SEND_TRAILING_METADATA_AND_FINISH, - SEND_FINISH -} send_action; - typedef struct { grpc_ioreq_completion_func on_complete; void *user_data; @@ -433,7 +421,7 @@ static grpc_cq_completion *allocate_completion(grpc_call *call) { if (call->allocated_completions & (1u << i)) { continue; } - call->allocated_completions |= 1u << i; + call->allocated_completions |= (gpr_uint8)(1u << i); gpr_mu_unlock(&call->completion_mu); return &call->completions[i]; } @@ -444,7 +432,8 @@ static grpc_cq_completion *allocate_completion(grpc_call *call) { static void done_completion(void *call, grpc_cq_completion *completion) { grpc_call *c = call; gpr_mu_lock(&c->completion_mu); - c->allocated_completions &= ~(1u << (completion - c->completions)); + c->allocated_completions &= + (gpr_uint8) ~(1u << (completion - c->completions)); gpr_mu_unlock(&c->completion_mu); GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); } @@ -520,7 +509,7 @@ static void set_status_code(grpc_call *call, status_source source, if (call->status[source].is_set) return; call->status[source].is_set = 1; - call->status[source].code = status; + call->status[source].code = (grpc_status_code)status; call->error_status_set = status != GRPC_STATUS_OK; if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) { @@ -545,7 +534,7 @@ static void set_encodings_accepted_by_peer( gpr_slice_buffer accept_encoding_parts; gpr_slice_buffer_init(&accept_encoding_parts); - gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts); + gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already * zeroes the whole grpc_call */ @@ -616,7 +605,7 @@ static void unlock(grpc_call *call) { int completing_requests = 0; int start_op = 0; int i; - const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; + const size_t MAX_RECV_PEEK_AHEAD = 65536; size_t buffered_bytes; int cancel_alarm = 0; @@ -755,7 +744,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, size_t i; /* ioreq is live: we need to do something */ master = &call->masters[master_set]; - master->complete_mask |= 1u << op; + master->complete_mask |= (gpr_uint16)(1u << op); if (!success) { master->success = 0; } @@ -1119,10 +1108,12 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { /* fall through intended */ case WRITE_STATE_STARTED: if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { + size_t length; data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE]; - grpc_sopb_add_begin_message( - &call->send_ops, grpc_byte_buffer_length(data.send_message), flags); + length = grpc_byte_buffer_length(data.send_message); + GPR_ASSERT(length <= GPR_UINT32_MAX); + grpc_sopb_add_begin_message(&call->send_ops, (gpr_uint32)length, flags); copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; @@ -1224,7 +1215,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, grpc_ioreq_completion_func completion, void *user_data) { size_t i; - gpr_uint32 have_ops = 0; + gpr_uint16 have_ops = 0; grpc_ioreq_op op; reqinfo_master *master; grpc_ioreq_data data; @@ -1255,13 +1246,13 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, } if (op == GRPC_IOREQ_SEND_STATUS) { set_status_code(call, STATUS_FROM_SERVER_STATUS, - reqs[i].data.send_status.code); + (gpr_uint32)reqs[i].data.send_status.code); if (reqs[i].data.send_status.details) { set_status_details(call, STATUS_FROM_SERVER_STATUS, GRPC_MDSTR_REF(reqs[i].data.send_status.details)); } } - have_ops |= 1u << op; + have_ops |= (gpr_uint16)(1u << op); call->request_data[op] = data; call->request_flags[op] = reqs[i].flags; @@ -1345,7 +1336,7 @@ static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, GPR_ASSERT(status != GRPC_STATUS_OK); - set_status_code(c, STATUS_FROM_API_OVERRIDE, status); + set_status_code(c, STATUS_FROM_API_OVERRIDE, (gpr_uint32)status); set_status_details(c, STATUS_FROM_API_OVERRIDE, details); c->cancel_with_status = status; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 586402e21c..a89523b3ab 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -113,7 +113,7 @@ grpc_channel *grpc_channel_create_from_filters( grpc_mdstr_from_string(mdctx, "grpc-message", 0); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(i, buf); + gpr_ltoa((long)i, buf); channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( mdctx, GRPC_MDSTR_REF(channel->grpc_status_string), grpc_mdstr_from_string(mdctx, buf, 0)); @@ -134,7 +134,7 @@ grpc_channel *grpc_channel_create_from_filters( gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", GRPC_ARG_MAX_MESSAGE_LENGTH); } else { - channel->max_message_length = args->args[i].value.integer; + channel->max_message_length = (gpr_uint32)args->args[i].value.integer; } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { if (args->args[i].type != GRPC_ARG_STRING) { @@ -193,7 +193,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { grpc_mdelem *send_metadata[2]; - int num_metadata = 0; + size_t num_metadata = 0; GPR_ASSERT(channel->is_client); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 707251da89..d323d0d74f 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -88,6 +88,8 @@ static void connected(void *arg, grpc_endpoint *tcp) { grpc_iomgr_add_callback(notify); } +static void connector_shutdown(grpc_connector *con) {} + static void connector_connect(grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, @@ -103,7 +105,7 @@ static void connector_connect(grpc_connector *con, } static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_connect}; + connector_ref, connector_unref, connector_shutdown, connector_connect}; typedef struct { grpc_subchannel_factory base; @@ -164,7 +166,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_resolver *resolver; subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); - int n = 0; + size_t n = 0; GPR_ASSERT(!reserved); if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 0d48cd42d7..03bd026a42 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -40,6 +40,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/time.h> #include "src/core/channel/channel_stack.h" +#include "src/core/client_config/lb_policy_registry.h" +#include "src/core/client_config/lb_policies/pick_first.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" @@ -85,6 +87,8 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); + grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create()); + grpc_register_lb_policy(grpc_pick_first_lb_factory_create()); grpc_resolver_registry_init("dns:///"); grpc_register_resolver_type(grpc_dns_resolver_factory_create()); grpc_register_resolver_type(grpc_ipv4_resolver_factory_create()); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 35b60bdbef..52c5e93988 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -61,6 +61,9 @@ typedef struct { grpc_iomgr_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; + + gpr_mu mu; + grpc_endpoint *connecting_endpoint; } connector; static void connector_ref(grpc_connector *con) { @@ -81,10 +84,20 @@ static void on_secure_transport_setup_done(void *arg, grpc_endpoint *secure_endpoint) { connector *c = arg; grpc_iomgr_closure *notify; - if (status != GRPC_SECURITY_OK) { + gpr_mu_lock(&c->mu); + if (c->connecting_endpoint == NULL) { + memset(c->result, 0, sizeof(*c->result)); + gpr_mu_unlock(&c->mu); + } else if (status != GRPC_SECURITY_OK) { + GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint); gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); memset(c->result, 0, sizeof(*c->result)); + c->connecting_endpoint = NULL; + gpr_mu_unlock(&c->mu); } else { + GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint); + c->connecting_endpoint = NULL; + gpr_mu_unlock(&c->mu); c->result->transport = grpc_create_chttp2_transport( c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); @@ -102,6 +115,10 @@ static void connected(void *arg, grpc_endpoint *tcp) { connector *c = arg; grpc_iomgr_closure *notify; if (tcp != NULL) { + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->connecting_endpoint == NULL); + c->connecting_endpoint = tcp; + gpr_mu_unlock(&c->mu); grpc_setup_secure_transport(&c->security_connector->base, tcp, on_secure_transport_setup_done, c); } else { @@ -112,6 +129,18 @@ static void connected(void *arg, grpc_endpoint *tcp) { } } +static void connector_shutdown(grpc_connector *con) { + connector *c = (connector *)con; + grpc_endpoint *ep; + gpr_mu_lock(&c->mu); + ep = c->connecting_endpoint; + c->connecting_endpoint = NULL; + gpr_mu_unlock(&c->mu); + if (ep) { + grpc_endpoint_shutdown(ep); + } +} + static void connector_connect(grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, @@ -122,12 +151,15 @@ static void connector_connect(grpc_connector *con, c->notify = notify; c->args = *args; c->result = result; + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->connecting_endpoint == NULL); + gpr_mu_unlock(&c->mu); grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, args->addr_len, args->deadline); } static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_connect}; + connector_ref, connector_unref, connector_shutdown, connector_connect}; typedef struct { grpc_subchannel_factory base; @@ -197,7 +229,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, subchannel_factory *f; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; - int n = 0; + size_t n = 0; GPR_ASSERT(reserved == NULL); if (grpc_find_security_connector_in_args(args) != NULL) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 292bf6fab8..3d404f78a4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -33,6 +33,7 @@ #include "src/core/surface/server.h" +#include <limits.h> #include <stdlib.h> #include <string.h> @@ -203,7 +204,7 @@ struct grpc_server { gpr_stack_lockfree *request_freelist; /** requested call backing data */ requested_call *requested_calls; - int max_requested_calls; + size_t max_requested_calls; gpr_atm shutdown_flag; gpr_uint8 shutdown_published; @@ -298,7 +299,7 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, */ static void request_matcher_init(request_matcher *request_matcher, - int entries) { + size_t entries) { memset(request_matcher, 0, sizeof(*request_matcher)); request_matcher->requests = gpr_stack_lockfree_create(entries); } @@ -804,7 +805,7 @@ grpc_server *grpc_server_create_from_filters( server->request_freelist = gpr_stack_lockfree_create(server->max_requested_calls); for (i = 0; i < (size_t)server->max_requested_calls; i++) { - gpr_stack_lockfree_push(server->request_freelist, i); + gpr_stack_lockfree_push(server->request_freelist, (int)i); } request_matcher_init(&server->unregistered_request_matcher, server->max_requested_calls); @@ -817,7 +818,7 @@ grpc_server *grpc_server_create_from_filters( grpc_server_census_filter (optional) - for stats collection and tracing {passed in filter stack} grpc_connected_channel_filter - for interfacing with transports */ - server->channel_filter_count = filter_count + 1 + census_enabled; + server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u); server->channel_filters = gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *)); server->channel_filters[0] = &server_surface_filter; @@ -825,7 +826,7 @@ grpc_server *grpc_server_create_from_filters( server->channel_filters[1] = &grpc_server_census_filter; } for (i = 0; i < filter_count; i++) { - server->channel_filters[i + 1 + census_enabled] = filters[i]; + server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i]; } server->channel_args = grpc_channel_args_copy(args); @@ -896,7 +897,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_mdstr *host; grpc_mdstr *method; gpr_uint32 hash; - gpr_uint32 slots; + size_t slots; gpr_uint32 probes; gpr_uint32 max_probes = 0; grpc_transport_op op; @@ -949,7 +950,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, crm->host = host; crm->method = method; } - chand->registered_method_slots = slots; + GPR_ASSERT(slots <= GPR_UINT32_MAX); + chand->registered_method_slots = (gpr_uint32)slots; chand->registered_method_max_probes = max_probes; } @@ -970,7 +972,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; - op.disconnect = gpr_atm_acq_load(&s->shutdown_flag); + op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0; grpc_transport_perform_op(transport, &op); } @@ -1246,7 +1248,8 @@ static void begin_call(grpc_server *server, call_data *calld, } GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc); + grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req), + publish, rc); } static void done_request_event(void *req, grpc_cq_completion *c) { @@ -1255,8 +1258,9 @@ static void done_request_event(void *req, grpc_cq_completion *c) { if (rc >= server->requested_calls && rc < server->requested_calls + server->max_requested_calls) { + GPR_ASSERT(rc - server->requested_calls <= INT_MAX); gpr_stack_lockfree_push(server->request_freelist, - rc - server->requested_calls); + (int)(rc - server->requested_calls)); } else { gpr_free(req); } diff --git a/src/core/transport/chttp2/bin_encoder.c b/src/core/transport/chttp2/bin_encoder.c index dee6dbec8b..e21d800083 100644 --- a/src/core/transport/chttp2/bin_encoder.c +++ b/src/core/transport/chttp2/bin_encoder.c @@ -68,7 +68,7 @@ gpr_slice grpc_chttp2_base64_encode(gpr_slice input) { size_t output_length = input_triplets * 4 + tail_xtra[tail_case]; gpr_slice output = gpr_slice_malloc(output_length); gpr_uint8 *in = GPR_SLICE_START_PTR(input); - gpr_uint8 *out = GPR_SLICE_START_PTR(output); + char *out = (char *)GPR_SLICE_START_PTR(output); size_t i; /* encode full triplets */ @@ -100,7 +100,7 @@ gpr_slice grpc_chttp2_base64_encode(gpr_slice input) { break; } - GPR_ASSERT(out == GPR_SLICE_END_PTR(output)); + GPR_ASSERT(out == (char *)GPR_SLICE_END_PTR(output)); GPR_ASSERT(in == GPR_SLICE_END_PTR(input)); return output; } @@ -128,12 +128,13 @@ gpr_slice grpc_chttp2_huffman_compress(gpr_slice input) { while (temp_length > 8) { temp_length -= 8; - *out++ = temp >> temp_length; + *out++ = (gpr_uint8)(temp >> temp_length); } } if (temp_length) { - *out++ = (temp << (8 - temp_length)) | (0xff >> temp_length); + *out++ = (gpr_uint8)(temp << (8u - temp_length)) | + (gpr_uint8)(0xffu >> temp_length); } GPR_ASSERT(out == GPR_SLICE_END_PTR(output)); @@ -150,16 +151,16 @@ typedef struct { static void enc_flush_some(huff_out *out) { while (out->temp_length > 8) { out->temp_length -= 8; - *out->out++ = out->temp >> out->temp_length; + *out->out++ = (gpr_uint8)(out->temp >> out->temp_length); } } static void enc_add2(huff_out *out, gpr_uint8 a, gpr_uint8 b) { b64_huff_sym sa = huff_alphabet[a]; b64_huff_sym sb = huff_alphabet[b]; - out->temp = - (out->temp << (sa.length + sb.length)) | (sa.bits << sb.length) | sb.bits; - out->temp_length += sa.length + sb.length; + out->temp = (out->temp << (sa.length + sb.length)) | + ((gpr_uint32)sa.bits << sb.length) | sb.bits; + out->temp_length += (gpr_uint32)sa.length + (gpr_uint32)sb.length; enc_flush_some(out); } @@ -189,8 +190,9 @@ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input) { /* encode full triplets */ for (i = 0; i < input_triplets; i++) { - enc_add2(&out, in[0] >> 2, ((in[0] & 0x3) << 4) | (in[1] >> 4)); - enc_add2(&out, ((in[1] & 0xf) << 2) | (in[2] >> 6), in[2] & 0x3f); + enc_add2(&out, in[0] >> 2, (gpr_uint8)((in[0] & 0x3) << 4) | (in[1] >> 4)); + enc_add2(&out, (gpr_uint8)((in[1] & 0xf) << 2) | (in[2] >> 6), + (gpr_uint8)(in[2] & 0x3f)); in += 3; } @@ -199,19 +201,20 @@ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input) { case 0: break; case 1: - enc_add2(&out, in[0] >> 2, (in[0] & 0x3) << 4); + enc_add2(&out, in[0] >> 2, (gpr_uint8)((in[0] & 0x3) << 4)); in += 1; break; case 2: - enc_add2(&out, in[0] >> 2, ((in[0] & 0x3) << 4) | (in[1] >> 4)); - enc_add1(&out, (in[1] & 0xf) << 2); + enc_add2(&out, in[0] >> 2, + (gpr_uint8)((in[0] & 0x3) << 4) | (gpr_uint8)(in[1] >> 4)); + enc_add1(&out, (gpr_uint8)((in[1] & 0xf) << 2)); in += 2; break; } if (out.temp_length) { - *out.out++ = - (out.temp << (8 - out.temp_length)) | (0xff >> out.temp_length); + *out.out++ = (gpr_uint8)(out.temp << (8u - out.temp_length)) | + (gpr_uint8)(0xffu >> out.temp_length); } GPR_ASSERT(out.out <= GPR_SLICE_END_PTR(output)); diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 474c3d5ee6..403358016d 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -146,20 +146,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); if ((gpr_uint32)(end - cur) == p->frame_size) { - grpc_sopb_add_slice(&p->incoming_sopb, - gpr_slice_sub(slice, cur - beg, end - beg)); + grpc_sopb_add_slice( + &p->incoming_sopb, + gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) > p->frame_size) { - grpc_sopb_add_slice( - &p->incoming_sopb, - gpr_slice_sub(slice, cur - beg, cur + p->frame_size - beg)); + grpc_sopb_add_slice(&p->incoming_sopb, + gpr_slice_sub(slice, (size_t)(cur - beg), + (size_t)(cur + p->frame_size - beg))); cur += p->frame_size; goto fh_0; /* loop */ } else { - grpc_sopb_add_slice(&p->incoming_sopb, - gpr_slice_sub(slice, cur - beg, end - beg)); - p->frame_size -= (end - cur); + grpc_sopb_add_slice( + &p->incoming_sopb, + gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + GPR_ASSERT(end - cur <= p->frame_size); + p->frame_size -= (gpr_uint32)(end - cur); return GRPC_CHTTP2_PARSE_OK; } } diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c index 1ccbba840c..09d4da234c 100644 --- a/src/core/transport/chttp2/frame_goaway.c +++ b/src/core/transport/chttp2/frame_goaway.c @@ -136,14 +136,15 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( ++cur; /* fallthrough */ case GRPC_CHTTP2_GOAWAY_DEBUG: - memcpy(p->debug_data + p->debug_pos, cur, end - cur); - p->debug_pos += end - cur; + memcpy(p->debug_data + p->debug_pos, cur, (size_t)(end - cur)); + GPR_ASSERT(end - cur < GPR_UINT32_MAX - p->debug_pos); + p->debug_pos += (gpr_uint32)(end - cur); p->state = GRPC_CHTTP2_GOAWAY_DEBUG; if (is_last) { transport_parsing->goaway_received = 1; transport_parsing->goaway_last_stream_index = p->last_stream_id; gpr_slice_unref(transport_parsing->goaway_text); - transport_parsing->goaway_error = p->error_code; + transport_parsing->goaway_error = (grpc_status_code)p->error_code; transport_parsing->goaway_text = gpr_slice_new(p->debug_data, p->debug_length, gpr_free); p->debug_data = NULL; @@ -160,12 +161,14 @@ void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code, gpr_slice_buffer *slice_buffer) { gpr_slice header = gpr_slice_malloc(9 + 4 + 4); gpr_uint8 *p = GPR_SLICE_START_PTR(header); - gpr_uint32 frame_length = 4 + 4 + GPR_SLICE_LENGTH(debug_data); + gpr_uint32 frame_length; + GPR_ASSERT(GPR_SLICE_LENGTH(debug_data) < GPR_UINT32_MAX - 4 - 4); + frame_length = 4 + 4 + (gpr_uint32)GPR_SLICE_LENGTH(debug_data); /* frame header: length */ - *p++ = frame_length >> 16; - *p++ = frame_length >> 8; - *p++ = frame_length; + *p++ = (gpr_uint8)(frame_length >> 16); + *p++ = (gpr_uint8)(frame_length >> 8); + *p++ = (gpr_uint8)(frame_length); /* frame header: type */ *p++ = GRPC_CHTTP2_FRAME_GOAWAY; /* frame header: flags */ @@ -176,15 +179,15 @@ void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code, *p++ = 0; *p++ = 0; /* payload: last stream id */ - *p++ = last_stream_id >> 24; - *p++ = last_stream_id >> 16; - *p++ = last_stream_id >> 8; - *p++ = last_stream_id; + *p++ = (gpr_uint8)(last_stream_id >> 24); + *p++ = (gpr_uint8)(last_stream_id >> 16); + *p++ = (gpr_uint8)(last_stream_id >> 8); + *p++ = (gpr_uint8)(last_stream_id); /* payload: error code */ - *p++ = error_code >> 24; - *p++ = error_code >> 16; - *p++ = error_code >> 8; - *p++ = error_code; + *p++ = (gpr_uint8)(error_code >> 24); + *p++ = (gpr_uint8)(error_code >> 16); + *p++ = (gpr_uint8)(error_code >> 8); + *p++ = (gpr_uint8)(error_code); GPR_ASSERT(p == GPR_SLICE_END_PTR(header)); gpr_slice_buffer_add(slice_buffer, header); gpr_slice_buffer_add(slice_buffer, debug_data); diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c index a878d936c1..67da245239 100644 --- a/src/core/transport/chttp2/frame_rst_stream.c +++ b/src/core/transport/chttp2/frame_rst_stream.c @@ -47,14 +47,14 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) { *p++ = 4; *p++ = GRPC_CHTTP2_FRAME_RST_STREAM; *p++ = 0; - *p++ = id >> 24; - *p++ = id >> 16; - *p++ = id >> 8; - *p++ = id; - *p++ = code >> 24; - *p++ = code >> 16; - *p++ = code >> 8; - *p++ = code; + *p++ = (gpr_uint8)(id >> 24); + *p++ = (gpr_uint8)(id >> 16); + *p++ = (gpr_uint8)(id >> 8); + *p++ = (gpr_uint8)(id); + *p++ = (gpr_uint8)(code >> 24); + *p++ = (gpr_uint8)(code >> 16); + *p++ = (gpr_uint8)(code >> 8); + *p++ = (gpr_uint8)(code); return slice; } diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c index d42bc000ae..54d3694a5c 100644 --- a/src/core/transport/chttp2/frame_settings.c +++ b/src/core/transport/chttp2/frame_settings.c @@ -61,9 +61,9 @@ const grpc_chttp2_setting_parameters static gpr_uint8 *fill_header(gpr_uint8 *out, gpr_uint32 length, gpr_uint8 flags) { - *out++ = length >> 16; - *out++ = length >> 8; - *out++ = length; + *out++ = (gpr_uint8)(length >> 16); + *out++ = (gpr_uint8)(length >> 8); + *out++ = (gpr_uint8)(length); *out++ = GRPC_CHTTP2_FRAME_SETTINGS; *out++ = flags; *out++ = 0; @@ -76,26 +76,26 @@ static gpr_uint8 *fill_header(gpr_uint8 *out, gpr_uint32 length, gpr_slice grpc_chttp2_settings_create(gpr_uint32 *old, const gpr_uint32 *new, gpr_uint32 force_mask, size_t count) { size_t i; - size_t n = 0; + gpr_uint32 n = 0; gpr_slice output; gpr_uint8 *p; for (i = 0; i < count; i++) { - n += (new[i] != old[i] || (force_mask & (1 << i)) != 0); + n += (new[i] != old[i] || (force_mask & (1u << i)) != 0); } output = gpr_slice_malloc(9 + 6 * n); p = fill_header(GPR_SLICE_START_PTR(output), 6 * n, 0); for (i = 0; i < count; i++) { - if (new[i] != old[i] || (force_mask & (1 << i)) != 0) { + if (new[i] != old[i] || (force_mask & (1u << i)) != 0) { GPR_ASSERT(i); - *p++ = i >> 8; - *p++ = i; - *p++ = new[i] >> 24; - *p++ = new[i] >> 16; - *p++ = new[i] >> 8; - *p++ = new[i]; + *p++ = (gpr_uint8)(i >> 8); + *p++ = (gpr_uint8)(i); + *p++ = (gpr_uint8)(new[i] >> 24); + *p++ = (gpr_uint8)(new[i] >> 16); + *p++ = (gpr_uint8)(new[i] >> 8); + *p++ = (gpr_uint8)(new[i]); old[i] = new[i]; } } @@ -162,7 +162,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( } return GRPC_CHTTP2_PARSE_OK; } - parser->id = ((gpr_uint16)*cur) << 8; + parser->id = (gpr_uint16)(((gpr_uint16)*cur) << 8); cur++; /* fallthrough */ case GRPC_CHTTP2_SPS_ID1: @@ -170,7 +170,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( parser->state = GRPC_CHTTP2_SPS_ID1; return GRPC_CHTTP2_PARSE_OK; } - parser->id |= (*cur); + parser->id = (gpr_uint16)(parser->id | (*cur)); cur++; /* fallthrough */ case GRPC_CHTTP2_SPS_VAL0: diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index d624298ad2..ea13969e8c 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -48,14 +48,14 @@ gpr_slice grpc_chttp2_window_update_create(gpr_uint32 id, *p++ = 4; *p++ = GRPC_CHTTP2_FRAME_WINDOW_UPDATE; *p++ = 0; - *p++ = id >> 24; - *p++ = id >> 16; - *p++ = id >> 8; - *p++ = id; - *p++ = window_update >> 24; - *p++ = window_update >> 16; - *p++ = window_update >> 8; - *p++ = window_update; + *p++ = (gpr_uint8)(id >> 24); + *p++ = (gpr_uint8)(id >> 16); + *p++ = (gpr_uint8)(id >> 8); + *p++ = (gpr_uint8)(id); + *p++ = (gpr_uint8)(window_update >> 24); + *p++ = (gpr_uint8)(window_update >> 16); + *p++ = (gpr_uint8)(window_update >> 8); + *p++ = (gpr_uint8)(window_update); return slice; } diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index f8bff42ed6..9c40e8a4e6 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -1085,11 +1085,13 @@ static int parse_string_prefix(grpc_chttp2_hpack_parser *p, static void append_bytes(grpc_chttp2_hpack_parser_string *str, const gpr_uint8 *data, size_t length) { if (length + str->length > str->capacity) { - str->capacity = str->length + length; + GPR_ASSERT(str->length + length <= GPR_UINT32_MAX); + str->capacity = (gpr_uint32)(str->length + length); str->str = gpr_realloc(str->str, str->capacity); } memcpy(str->str + str->length, data, length); - str->length += length; + GPR_ASSERT(length <= GPR_UINT32_MAX - str->length); + str->length += (gpr_uint32)length; } static int append_string(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, @@ -1099,7 +1101,7 @@ static int append_string(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, gpr_uint8 decoded[3]; switch ((binary_state)p->binary) { case NOT_BINARY: - append_bytes(str, cur, end - cur); + append_bytes(str, cur, (size_t)(end - cur)); return 1; b64_byte0: case B64_BYTE0: @@ -1157,9 +1159,9 @@ static int append_string(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, goto b64_byte3; p->base64_buffer |= bits; bits = p->base64_buffer; - decoded[0] = bits >> 16; - decoded[1] = bits >> 8; - decoded[2] = bits; + decoded[0] = (gpr_uint8)(bits >> 16); + decoded[1] = (gpr_uint8)(bits >> 8); + decoded[2] = (gpr_uint8)(bits); append_bytes(str, decoded, 3); goto b64_byte0; } @@ -1189,7 +1191,7 @@ static int finish_str(grpc_chttp2_hpack_parser *p) { bits & 0xffff); return 0; } - decoded[0] = bits >> 16; + decoded[0] = (gpr_uint8)(bits >> 16); append_bytes(str, decoded, 1); break; case B64_BYTE3: @@ -1199,8 +1201,8 @@ static int finish_str(grpc_chttp2_hpack_parser *p) { bits & 0xff); return 0; } - decoded[0] = bits >> 16; - decoded[1] = bits >> 8; + decoded[0] = (gpr_uint8)(bits >> 16); + decoded[1] = (gpr_uint8)(bits >> 8); append_bytes(str, decoded, 2); break; } @@ -1249,13 +1251,14 @@ static int add_str_bytes(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, static int parse_string(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { size_t remaining = p->strlen - p->strgot; - size_t given = end - cur; + size_t given = (size_t)(end - cur); if (remaining <= given) { return add_str_bytes(p, cur, cur + remaining) && finish_str(p) && parse_next(p, cur + remaining, end); } else { if (!add_str_bytes(p, cur, cur + given)) return 0; - p->strgot += given; + GPR_ASSERT(given <= GPR_UINT32_MAX - p->strgot); + p->strgot += (gpr_uint32)given; p->state = parse_string; return 1; } diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index c1768d9d5d..4f489d67fb 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -79,7 +79,7 @@ struct grpc_chttp2_hpack_parser { /* number of source bytes read for the currently parsing string */ gpr_uint32 strgot; /* huffman decoding state */ - gpr_uint16 huff_state; + gpr_int16 huff_state; /* is the string being decoded binary? */ gpr_uint8 binary; /* is the current string huffman encoded? */ diff --git a/src/core/transport/chttp2/hpack_table.c b/src/core/transport/chttp2/hpack_table.c index 4fc154380e..e18778ab0b 100644 --- a/src/core/transport/chttp2/hpack_table.c +++ b/src/core/transport/chttp2/hpack_table.c @@ -139,7 +139,7 @@ grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl, /* Otherwise, find the value in the list of valid entries */ index -= (GRPC_CHTTP2_LAST_STATIC_ENTRY + 1); if (index < tbl->num_ents) { - gpr_uint32 offset = (tbl->num_ents - 1 - index + tbl->first_ent) % + gpr_uint32 offset = (tbl->num_ents - 1u - index + tbl->first_ent) % GRPC_CHTTP2_MAX_TABLE_COUNT; return tbl->ents[offset]; } @@ -150,19 +150,22 @@ grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl, /* Evict one element from the table */ static void evict1(grpc_chttp2_hptbl *tbl) { grpc_mdelem *first_ent = tbl->ents[tbl->first_ent]; - tbl->mem_used -= GPR_SLICE_LENGTH(first_ent->key->slice) + - GPR_SLICE_LENGTH(first_ent->value->slice) + - GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; - tbl->first_ent = (tbl->first_ent + 1) % GRPC_CHTTP2_MAX_TABLE_COUNT; + size_t elem_bytes = GPR_SLICE_LENGTH(first_ent->key->slice) + + GPR_SLICE_LENGTH(first_ent->value->slice) + + GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; + GPR_ASSERT(elem_bytes <= tbl->mem_used); + tbl->mem_used = (gpr_uint16)(tbl->mem_used - elem_bytes); + tbl->first_ent = + (gpr_uint16)((tbl->first_ent + 1) % GRPC_CHTTP2_MAX_TABLE_COUNT); tbl->num_ents--; GRPC_MDELEM_UNREF(first_ent); } void grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { /* determine how many bytes of buffer this entry represents */ - gpr_uint16 elem_bytes = GPR_SLICE_LENGTH(md->key->slice) + - GPR_SLICE_LENGTH(md->value->slice) + - GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; + size_t elem_bytes = GPR_SLICE_LENGTH(md->key->slice) + + GPR_SLICE_LENGTH(md->value->slice) + + GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; /* we can't add elements bigger than the max table size */ if (elem_bytes > tbl->max_bytes) { @@ -182,7 +185,7 @@ void grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { } /* evict entries to ensure no overflow */ - while (elem_bytes > tbl->max_bytes - tbl->mem_used) { + while (elem_bytes > (size_t)tbl->max_bytes - tbl->mem_used) { evict1(tbl); } @@ -190,28 +193,30 @@ void grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { tbl->ents[tbl->last_ent] = md; /* update accounting values */ - tbl->last_ent = (tbl->last_ent + 1) % GRPC_CHTTP2_MAX_TABLE_COUNT; + tbl->last_ent = + (gpr_uint16)((tbl->last_ent + 1) % GRPC_CHTTP2_MAX_TABLE_COUNT); tbl->num_ents++; - tbl->mem_used += elem_bytes; + tbl->mem_used = (gpr_uint16)(tbl->mem_used + elem_bytes); } grpc_chttp2_hptbl_find_result grpc_chttp2_hptbl_find( const grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { grpc_chttp2_hptbl_find_result r = {0, 0}; - int i; + gpr_uint16 i; /* See if the string is in the static table */ for (i = 0; i < GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) { grpc_mdelem *ent = tbl->static_ents[i]; if (md->key != ent->key) continue; - r.index = i + 1; + r.index = (gpr_uint16)(i + 1); r.has_value = md->value == ent->value; if (r.has_value) return r; } /* Scan the dynamic table */ for (i = 0; i < tbl->num_ents; i++) { - int idx = tbl->num_ents - i + GRPC_CHTTP2_LAST_STATIC_ENTRY; + gpr_uint16 idx = + (gpr_uint16)(tbl->num_ents - i + GRPC_CHTTP2_LAST_STATIC_ENTRY); grpc_mdelem *ent = tbl->ents[(tbl->first_ent + i) % GRPC_CHTTP2_MAX_TABLE_COUNT]; if (md->key != ent->key) continue; diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index 974b864ffb..d216c42113 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -122,7 +122,7 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb( for (i = 0; i < sopb->nops; i++) { if (sopb->ops[i].type != GRPC_OP_METADATA) continue; sopb->ops[i].data.metadata.list.tail = - (void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail); + (void *)(delta + (gpr_uintptr)sopb->ops[i].data.metadata.list.tail); } src->count = 0; } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7a42de9245..c8c1abb750 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -168,7 +168,7 @@ typedef struct { grpc_iomgr_closure *pending_closures_tail; /** window available for us to send to peer */ - gpr_uint32 outgoing_window; + gpr_int64 outgoing_window; /** window available for peer to send to us - updated after parse */ gpr_uint32 incoming_window; /** how much window would we like to have for incoming_window */ @@ -280,7 +280,7 @@ struct grpc_chttp2_transport_parsing { gpr_uint32 goaway_last_stream_index; gpr_slice goaway_text; - gpr_uint64 outgoing_window_update; + gpr_int64 outgoing_window_update; /** pings awaiting responses */ grpc_chttp2_outstanding_ping pings; @@ -609,20 +609,21 @@ extern int grpc_flowctl_trace; else \ stmt -#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \ - delta) \ - if (!(grpc_flowctl_trace)) { \ - } else { \ - grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ - transport->is_client, context->id, context->var, \ - delta); \ +#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \ + delta) \ + if (!(grpc_flowctl_trace)) { \ + } else { \ + grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ + transport->is_client, context->id, \ + (gpr_int64)(context->var), (gpr_int64)(delta)); \ } -#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \ - if (!(grpc_flowctl_trace)) { \ - } else { \ - grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ - context->is_client, 0, context->var, delta); \ +#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \ + if (!(grpc_flowctl_trace)) { \ + } else { \ + grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ + context->is_client, 0, \ + (gpr_int64)(context->var), (gpr_int64)(delta)); \ } void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index dc5eb18e42..f26f446787 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -131,7 +131,7 @@ void grpc_chttp2_publish_reads( published later */ if (transport_parsing->goaway_received) { grpc_chttp2_add_incoming_goaway(transport_global, - transport_parsing->goaway_error, + (gpr_uint32)transport_parsing->goaway_error, transport_parsing->goaway_text); transport_parsing->goaway_text = gpr_empty_slice(); transport_parsing->goaway_received = 0; @@ -194,7 +194,9 @@ void grpc_chttp2_publish_reads( GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( "parsed", transport_parsing, stream_parsing, outgoing_window_update, -(gpr_int64)stream_parsing->outgoing_window_update); - stream_global->outgoing_window += stream_parsing->outgoing_window_update; + GPR_ASSERT(stream_parsing->outgoing_window_update <= GPR_UINT32_MAX); + stream_global->outgoing_window += + (gpr_uint32)stream_parsing->outgoing_window_update; stream_parsing->outgoing_window_update = 0; is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { @@ -211,7 +213,7 @@ void grpc_chttp2_publish_reads( if (stream_parsing->saw_rst_stream) { stream_global->cancelled = 1; stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status( - stream_parsing->rst_stream_reason); + (grpc_chttp2_error_code)stream_parsing->rst_stream_reason); if (stream_parsing->rst_stream_reason == GRPC_CHTTP2_NO_ERROR) { stream_global->published_cancelled = 1; } @@ -379,9 +381,10 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, case GRPC_DTS_FRAME: GPR_ASSERT(cur < end); if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) { - if (!parse_frame_slice( - transport_parsing, - gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { + if (!parse_frame_slice(transport_parsing, + gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), + (size_t)(end - beg)), + 1)) { return 0; } transport_parsing->deframe_state = GRPC_DTS_FH_0; @@ -389,11 +392,12 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, return 1; } else if ((gpr_uint32)(end - cur) > transport_parsing->incoming_frame_size) { + size_t cur_offset = (size_t)(cur - beg); if (!parse_frame_slice( transport_parsing, gpr_slice_sub_no_ref( - slice, cur - beg, - cur + transport_parsing->incoming_frame_size - beg), + slice, cur_offset, + cur_offset + transport_parsing->incoming_frame_size), 1)) { return 0; } @@ -401,12 +405,13 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, transport_parsing->incoming_stream = NULL; goto dts_fh_0; /* loop */ } else { - if (!parse_frame_slice( - transport_parsing, - gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) { + if (!parse_frame_slice(transport_parsing, + gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), + (size_t)(end - beg)), + 0)) { return 0; } - transport_parsing->incoming_frame_size -= (end - cur); + transport_parsing->incoming_frame_size -= (gpr_uint32)(end - cur); return 1; } gpr_log(GPR_ERROR, "should never reach here"); @@ -474,14 +479,14 @@ static void skip_header(void *tp, grpc_mdelem *md) { GRPC_MDELEM_UNREF(md); } static int init_skip_frame_parser( grpc_chttp2_transport_parsing *transport_parsing, int is_header) { if (is_header) { - int is_eoh = transport_parsing->expect_continuation_stream_id != 0; + gpr_uint8 is_eoh = transport_parsing->expect_continuation_stream_id != 0; transport_parsing->parser = grpc_chttp2_header_parser_parse; transport_parsing->parser_data = &transport_parsing->hpack_parser; transport_parsing->hpack_parser.on_header = skip_header; transport_parsing->hpack_parser.on_header_user_data = NULL; transport_parsing->hpack_parser.is_boundary = is_eoh; transport_parsing->hpack_parser.is_eof = - is_eoh ? transport_parsing->header_eof : 0; + (gpr_uint8)(is_eoh ? transport_parsing->header_eof : 0); } else { transport_parsing->parser = skip_parser; } @@ -617,8 +622,8 @@ static void on_header(void *tp, grpc_mdelem *md) { static int init_header_frame_parser( grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) { - int is_eoh = (transport_parsing->incoming_frame_flags & - GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; + gpr_uint8 is_eoh = (transport_parsing->incoming_frame_flags & + GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; int via_accept = 0; grpc_chttp2_stream_parsing *stream_parsing; @@ -691,7 +696,7 @@ static int init_header_frame_parser( transport_parsing->hpack_parser.on_header_user_data = transport_parsing; transport_parsing->hpack_parser.is_boundary = is_eoh; transport_parsing->hpack_parser.is_eof = - is_eoh ? transport_parsing->header_eof : 0; + (gpr_uint8)(is_eoh ? transport_parsing->header_eof : 0); if (!is_continuation && (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser); diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 1ea697f71e..6a22532bc2 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -74,17 +74,18 @@ typedef struct { } framer_state; /* fills p (which is expected to be 9 bytes long) with a data frame header */ -static void fill_header(gpr_uint8 *p, gpr_uint8 type, gpr_uint32 id, - gpr_uint32 len, gpr_uint8 flags) { - *p++ = len >> 16; - *p++ = len >> 8; - *p++ = len; +static void fill_header(gpr_uint8 *p, gpr_uint8 type, gpr_uint32 id, size_t len, + gpr_uint8 flags) { + GPR_ASSERT(len < 16777316); + *p++ = (gpr_uint8)(len >> 16); + *p++ = (gpr_uint8)(len >> 8); + *p++ = (gpr_uint8)(len); *p++ = type; *p++ = flags; - *p++ = id >> 24; - *p++ = id >> 16; - *p++ = id >> 8; - *p++ = id; + *p++ = (gpr_uint8)(id >> 24); + *p++ = (gpr_uint8)(id >> 16); + *p++ = (gpr_uint8)(id >> 8); + *p++ = (gpr_uint8)(id); } /* finish a frame - fill in the previously reserved header */ @@ -105,11 +106,12 @@ static void finish_frame(framer_state *st, int is_header_boundary, case NONE: return; } - fill_header(GPR_SLICE_START_PTR(st->output->slices[st->header_idx]), type, - st->stream_id, - st->output->length - st->output_length_at_start_of_frame, - (is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) | - (is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0)); + fill_header( + GPR_SLICE_START_PTR(st->output->slices[st->header_idx]), type, + st->stream_id, st->output->length - st->output_length_at_start_of_frame, + (gpr_uint8)( + (is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) | + (is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0))); st->cur_frame_type = NONE; } @@ -134,7 +136,7 @@ static void begin_new_frame(framer_state *st, frame_type type) { space to add at least about_to_add bytes -- finishes the current frame if needed */ static void ensure_frame_type(framer_state *st, frame_type type, - int need_bytes) { + size_t need_bytes) { if (st->cur_frame_type == type && st->output->length - st->output_length_at_start_of_frame + need_bytes <= GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) { @@ -174,7 +176,7 @@ static void add_header_data(framer_state *st, gpr_slice slice) { } } -static gpr_uint8 *add_tiny_header_data(framer_state *st, int len) { +static gpr_uint8 *add_tiny_header_data(framer_state *st, size_t len) { ensure_frame_type(st, HEADER, len); return gpr_slice_buffer_tiny_add(st->output, len); } @@ -185,10 +187,12 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, gpr_uint32 key_hash = elem->key->hash; gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1; - gpr_uint32 elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) + - GPR_SLICE_LENGTH(elem->value->slice); + size_t elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) + + GPR_SLICE_LENGTH(elem->value->slice); grpc_mdelem *elem_to_unref; + GPR_ASSERT(elem_size < 65536); + /* Reserve space for this element in the remote table: if this overflows the current table, drop elements until it fits, matching the decompressor algorithm */ @@ -200,14 +204,16 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, c->table_elem_size[c->tail_remote_index % GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS]); GPR_ASSERT(c->table_elems > 0); - c->table_size -= c->table_elem_size[c->tail_remote_index % - GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS]; + c->table_size = + (gpr_uint16)(c->table_size - + c->table_elem_size[c->tail_remote_index % + GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS]); c->table_elems--; } GPR_ASSERT(c->table_elems < GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS); c->table_elem_size[new_index % GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS] = - elem_size; - c->table_size += elem_size; + (gpr_uint16)elem_size; + c->table_size = (gpr_uint16)(c->table_size + elem_size); c->table_elems++; /* Store this element into {entries,indices}_elem */ @@ -270,7 +276,7 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 index, framer_state *st) { - int len = GRPC_CHTTP2_VARINT_LENGTH(index, 1); + gpr_uint32 len = GRPC_CHTTP2_VARINT_LENGTH(index, 1); GRPC_CHTTP2_WRITE_VARINT(index, 1, 0x80, add_tiny_header_data(st, len), len); } @@ -288,14 +294,16 @@ static gpr_slice get_wire_value(grpc_mdelem *elem, gpr_uint8 *huffman_prefix) { static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 key_index, grpc_mdelem *elem, framer_state *st) { - int len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 2); + gpr_uint32 len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 2); gpr_uint8 huffman_prefix; gpr_slice value_slice = get_wire_value(elem, &huffman_prefix); - int len_val = GPR_SLICE_LENGTH(value_slice); - int len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + size_t len_val = GPR_SLICE_LENGTH(value_slice); + gpr_uint32 len_val_len; + GPR_ASSERT(len_val <= GPR_UINT32_MAX); + len_val_len = GRPC_CHTTP2_VARINT_LENGTH((gpr_uint32)len_val, 1); GRPC_CHTTP2_WRITE_VARINT(key_index, 2, 0x40, add_tiny_header_data(st, len_pfx), len_pfx); - GRPC_CHTTP2_WRITE_VARINT(len_val, 1, 0x00, + GRPC_CHTTP2_WRITE_VARINT((gpr_uint32)len_val, 1, 0x00, add_tiny_header_data(st, len_val_len), len_val_len); add_header_data(st, gpr_slice_ref(value_slice)); } @@ -303,26 +311,30 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 key_index, grpc_mdelem *elem, framer_state *st) { - int len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 4); + gpr_uint32 len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 4); gpr_uint8 huffman_prefix; gpr_slice value_slice = get_wire_value(elem, &huffman_prefix); - int len_val = GPR_SLICE_LENGTH(value_slice); - int len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + size_t len_val = GPR_SLICE_LENGTH(value_slice); + gpr_uint32 len_val_len; + GPR_ASSERT(len_val <= GPR_UINT32_MAX); + len_val_len = GRPC_CHTTP2_VARINT_LENGTH((gpr_uint32)len_val, 1); GRPC_CHTTP2_WRITE_VARINT(key_index, 4, 0x00, add_tiny_header_data(st, len_pfx), len_pfx); - GRPC_CHTTP2_WRITE_VARINT(len_val, 1, 0x00, + GRPC_CHTTP2_WRITE_VARINT((gpr_uint32)len_val, 1, 0x00, add_tiny_header_data(st, len_val_len), len_val_len); add_header_data(st, gpr_slice_ref(value_slice)); } static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, framer_state *st) { - int len_key = GPR_SLICE_LENGTH(elem->key->slice); + gpr_uint32 len_key = (gpr_uint32)GPR_SLICE_LENGTH(elem->key->slice); gpr_uint8 huffman_prefix; gpr_slice value_slice = get_wire_value(elem, &huffman_prefix); - int len_val = GPR_SLICE_LENGTH(value_slice); - int len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); - int len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + gpr_uint32 len_val = (gpr_uint32)GPR_SLICE_LENGTH(value_slice); + gpr_uint32 len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); + gpr_uint32 len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + GPR_ASSERT(len_key <= GPR_UINT32_MAX); + GPR_ASSERT(GPR_SLICE_LENGTH(value_slice) <= GPR_UINT32_MAX); *add_tiny_header_data(st, 1) = 0x40; GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, add_tiny_header_data(st, len_key_len), len_key_len); @@ -334,12 +346,14 @@ static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, framer_state *st) { - int len_key = GPR_SLICE_LENGTH(elem->key->slice); + gpr_uint32 len_key = (gpr_uint32)GPR_SLICE_LENGTH(elem->key->slice); gpr_uint8 huffman_prefix; gpr_slice value_slice = get_wire_value(elem, &huffman_prefix); - int len_val = GPR_SLICE_LENGTH(value_slice); - int len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); - int len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + gpr_uint32 len_val = (gpr_uint32)GPR_SLICE_LENGTH(value_slice); + gpr_uint32 len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); + gpr_uint32 len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + GPR_ASSERT(len_key <= GPR_UINT32_MAX); + GPR_ASSERT(GPR_SLICE_LENGTH(value_slice) <= GPR_UINT32_MAX); *add_tiny_header_data(st, 1) = 0x00; GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, add_tiny_header_data(st, len_key_len), len_key_len); @@ -488,7 +502,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, gpr_uint32 flow_controlled_bytes_taken = 0; gpr_uint32 curop = 0; gpr_uint8 *p; - int compressed_flag_set = 0; + gpr_uint8 compressed_flag_set = 0; while (curop < *inops_count) { GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes); @@ -514,10 +528,10 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, p = GPR_SLICE_START_PTR(slice); p[0] = compressed_flag_set; - p[1] = op->data.begin_message.length >> 24; - p[2] = op->data.begin_message.length >> 16; - p[3] = op->data.begin_message.length >> 8; - p[4] = op->data.begin_message.length; + p[1] = (gpr_uint8)(op->data.begin_message.length >> 24); + p[2] = (gpr_uint8)(op->data.begin_message.length >> 16); + p[3] = (gpr_uint8)(op->data.begin_message.length >> 8); + p[4] = (gpr_uint8)(op->data.begin_message.length); op->type = GRPC_OP_SLICE; op->data.slice = slice; /* fallthrough */ @@ -541,7 +555,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, grpc_sopb_append(outops, op, 1); curop++; } - flow_controlled_bytes_taken += GPR_SLICE_LENGTH(slice); + flow_controlled_bytes_taken += (gpr_uint32)GPR_SLICE_LENGTH(slice); break; } } @@ -565,7 +579,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, framer_state st; gpr_slice slice; grpc_stream_op *op; - gpr_uint32 max_take_size; + size_t max_take_size; gpr_uint32 curop = 0; gpr_uint32 unref_op; grpc_mdctx *mdctx = compressor->mdctx; diff --git a/src/core/transport/chttp2/timeout_encoding.c b/src/core/transport/chttp2/timeout_encoding.c index 1dd768ada4..8a9b290ecb 100644 --- a/src/core/transport/chttp2/timeout_encoding.c +++ b/src/core/transport/chttp2/timeout_encoding.c @@ -123,7 +123,7 @@ void grpc_chttp2_encode_timeout(gpr_timespec timeout, char *buffer) { enc_nanos(buffer, timeout.tv_nsec); } else if (timeout.tv_sec < 1000 && timeout.tv_nsec != 0) { enc_micros(buffer, - timeout.tv_sec * 1000000 + + (int)(timeout.tv_sec * 1000000) + (timeout.tv_nsec / 1000 + (timeout.tv_nsec % 1000 != 0))); } else { enc_seconds(buffer, timeout.tv_sec + (timeout.tv_nsec != 0)); @@ -137,14 +137,14 @@ static int is_all_whitespace(const char *p) { int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) { gpr_uint32 x = 0; - const char *p = buffer; + const gpr_uint8 *p = (const gpr_uint8 *)buffer; int have_digit = 0; /* skip whitespace */ for (; *p == ' '; p++) ; /* decode numeric part */ for (; *p >= '0' && *p <= '9'; p++) { - gpr_uint32 xp = x * 10 + *p - '0'; + gpr_uint32 xp = x * 10u + (gpr_uint32)*p - (gpr_uint32)'0'; have_digit = 1; if (xp < x) { *timeout = gpr_inf_future(GPR_CLOCK_REALTIME); @@ -180,5 +180,5 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) { return 0; } p++; - return is_all_whitespace(p); + return is_all_whitespace((const char *)p); } diff --git a/src/core/transport/chttp2/varint.c b/src/core/transport/chttp2/varint.c index 0722c9ada9..056f68047b 100644 --- a/src/core/transport/chttp2/varint.c +++ b/src/core/transport/chttp2/varint.c @@ -33,7 +33,7 @@ #include "src/core/transport/chttp2/varint.h" -int grpc_chttp2_hpack_varint_length(gpr_uint32 tail_value) { +gpr_uint32 grpc_chttp2_hpack_varint_length(gpr_uint32 tail_value) { if (tail_value < (1 << 7)) { return 2; } else if (tail_value < (1 << 14)) { @@ -48,7 +48,8 @@ int grpc_chttp2_hpack_varint_length(gpr_uint32 tail_value) { } void grpc_chttp2_hpack_write_varint_tail(gpr_uint32 tail_value, - gpr_uint8* target, int tail_length) { + gpr_uint8* target, + gpr_uint32 tail_length) { switch (tail_length) { case 5: target[4] = (gpr_uint8)((tail_value >> 28) | 0x80); diff --git a/src/core/transport/chttp2/varint.h b/src/core/transport/chttp2/varint.h index 0a6fb55248..4dfcc76773 100644 --- a/src/core/transport/chttp2/varint.h +++ b/src/core/transport/chttp2/varint.h @@ -41,10 +41,11 @@ /* length of a value that needs varint tail encoding (it's bigger than can be bitpacked into the opcode byte) - returned value includes the length of the opcode byte */ -int grpc_chttp2_hpack_varint_length(gpr_uint32 tail_value); +gpr_uint32 grpc_chttp2_hpack_varint_length(gpr_uint32 tail_value); void grpc_chttp2_hpack_write_varint_tail(gpr_uint32 tail_value, - gpr_uint8* target, int tail_length); + gpr_uint8* target, + gpr_uint32 tail_length); /* maximum value that can be bitpacked with the opcode if the opcode has a prefix @@ -54,15 +55,15 @@ void grpc_chttp2_hpack_write_varint_tail(gpr_uint32 tail_value, /* length required to bitpack a value */ #define GRPC_CHTTP2_VARINT_LENGTH(n, prefix_bits) \ ((n) < GRPC_CHTTP2_MAX_IN_PREFIX(prefix_bits) \ - ? 1 \ + ? 1u \ : grpc_chttp2_hpack_varint_length( \ (n)-GRPC_CHTTP2_MAX_IN_PREFIX(prefix_bits))) #define GRPC_CHTTP2_WRITE_VARINT(n, prefix_bits, prefix_or, target, length) \ do { \ gpr_uint8* tgt = target; \ - if ((length) == 1) { \ - (tgt)[0] = (prefix_or) | (n); \ + if ((length) == 1u) { \ + (tgt)[0] = (gpr_uint8)((prefix_or) | (n)); \ } else { \ (tgt)[0] = (prefix_or) | GRPC_CHTTP2_MAX_IN_PREFIX(prefix_bits); \ grpc_chttp2_hpack_write_varint_tail( \ diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index ac79044e08..c015e82931 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -32,10 +32,13 @@ */ #include "src/core/transport/chttp2/internal.h" -#include "src/core/transport/chttp2/http2_errors.h" + +#include <limits.h> #include <grpc/support/log.h> +#include "src/core/transport/chttp2/http2_errors.h" + static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_unlocking_check_writes( @@ -78,12 +81,13 @@ int grpc_chttp2_unlocking_check_writes( stream_writing->send_closed = GRPC_DONT_SEND_CLOSED; if (stream_global->outgoing_sopb) { - window_delta = - grpc_chttp2_preencode(stream_global->outgoing_sopb->ops, - &stream_global->outgoing_sopb->nops, - GPR_MIN(transport_global->outgoing_window, - stream_global->outgoing_window), - &stream_writing->sopb); + window_delta = grpc_chttp2_preencode( + stream_global->outgoing_sopb->ops, + &stream_global->outgoing_sopb->nops, + (gpr_uint32)GPR_MIN(GPR_MIN(transport_global->outgoing_window, + stream_global->outgoing_window), + GPR_UINT32_MAX), + &stream_writing->sopb); GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( "write", transport_global, outgoing_window, -(gpr_int64)window_delta); GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9e3d7dd551..deb2fedf0c 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -209,7 +209,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_chttp2_transport *t, const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, - int is_client) { + gpr_uint8 is_client) { size_t i; int j; @@ -306,7 +306,7 @@ static void init_transport(grpc_chttp2_transport *t, GRPC_ARG_MAX_CONCURRENT_STREAMS); } else { push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - channel_args->args[i].value.integer); + (gpr_uint32)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { @@ -320,7 +320,8 @@ static void init_transport(grpc_chttp2_transport *t, t->global.next_stream_id & 1, is_client ? "client" : "server"); } else { - t->global.next_stream_id = channel_args->args[i].value.integer; + t->global.next_stream_id = + (gpr_uint32)channel_args->args[i].value.integer; } } } @@ -682,6 +683,8 @@ static void perform_stream_op_locked( stream_global->publish_sopb = op->recv_ops; stream_global->publish_sopb->nops = 0; stream_global->publish_state = op->recv_state; + /* clamp max recv bytes */ + op->max_recv_bytes = GPR_MIN(op->max_recv_bytes, GPR_UINT32_MAX); if (stream_global->max_recv_bytes < op->max_recv_bytes) { GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( "op", transport_global, stream_global, max_recv_bytes, @@ -690,8 +693,8 @@ static void perform_stream_op_locked( "op", transport_global, stream_global, unannounced_incoming_window, op->max_recv_bytes - stream_global->max_recv_bytes); stream_global->unannounced_incoming_window += - op->max_recv_bytes - stream_global->max_recv_bytes; - stream_global->max_recv_bytes = op->max_recv_bytes; + (gpr_uint32)op->max_recv_bytes - stream_global->max_recv_bytes; + stream_global->max_recv_bytes = (gpr_uint32)op->max_recv_bytes; } grpc_chttp2_incoming_metadata_live_op_buffer_end( &stream_global->outstanding_metadata); @@ -728,14 +731,14 @@ static void send_ping_locked(grpc_chttp2_transport *t, p->next = &t->global.pings; p->prev = p->next->prev; p->prev->next = p->next->prev = p; - p->id[0] = (t->global.ping_counter >> 56) & 0xff; - p->id[1] = (t->global.ping_counter >> 48) & 0xff; - p->id[2] = (t->global.ping_counter >> 40) & 0xff; - p->id[3] = (t->global.ping_counter >> 32) & 0xff; - p->id[4] = (t->global.ping_counter >> 24) & 0xff; - p->id[5] = (t->global.ping_counter >> 16) & 0xff; - p->id[6] = (t->global.ping_counter >> 8) & 0xff; - p->id[7] = t->global.ping_counter & 0xff; + p->id[0] = (gpr_uint8)((t->global.ping_counter >> 56) & 0xff); + p->id[1] = (gpr_uint8)((t->global.ping_counter >> 48) & 0xff); + p->id[2] = (gpr_uint8)((t->global.ping_counter >> 40) & 0xff); + p->id[3] = (gpr_uint8)((t->global.ping_counter >> 32) & 0xff); + p->id[4] = (gpr_uint8)((t->global.ping_counter >> 24) & 0xff); + p->id[5] = (gpr_uint8)((t->global.ping_counter >> 16) & 0xff); + p->id[6] = (gpr_uint8)((t->global.ping_counter >> 8) & 0xff); + p->id[7] = (gpr_uint8)(t->global.ping_counter & 0xff); p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); } @@ -760,7 +763,7 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { t->global.sent_goaway = 1; grpc_chttp2_goaway_append( t->global.last_incoming_stream_id, - grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), + (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), gpr_slice_ref(*op->goaway_message), &t->global.qbuf); close_transport = !grpc_chttp2_has_streams(t); } @@ -828,8 +831,9 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + grpc_chttp2_stream_map_size(&t->new_stream_map); + GPR_ASSERT(new_stream_count <= GPR_UINT32_MAX); if (new_stream_count != t->global.concurrent_stream_count) { - t->global.concurrent_stream_count = new_stream_count; + t->global.concurrent_stream_count = (gpr_uint32)new_stream_count; maybe_start_some_streams(&t->global); } } @@ -942,7 +946,8 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, gpr_slice_buffer_add( &transport_global->qbuf, grpc_chttp2_rst_stream_create( - stream_global->id, grpc_chttp2_grpc_status_to_http2_error(status))); + stream_global->id, + (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(status))); } grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); @@ -988,14 +993,14 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, *p++ = 's'; if (status < 10) { *p++ = 1; - *p++ = '0' + status; + *p++ = (gpr_uint8)('0' + status); } else { *p++ = 2; - *p++ = '0' + (status / 10); - *p++ = '0' + (status % 10); + *p++ = (gpr_uint8)('0' + (status / 10)); + *p++ = (gpr_uint8)('0' + (status % 10)); } GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); - len += GPR_SLICE_LENGTH(status_hdr); + len += (gpr_uint32)GPR_SLICE_LENGTH(status_hdr); if (optional_message) { GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127); @@ -1015,23 +1020,23 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, *p++ = 'a'; *p++ = 'g'; *p++ = 'e'; - *p++ = GPR_SLICE_LENGTH(*optional_message); + *p++ = (gpr_uint8)GPR_SLICE_LENGTH(*optional_message); GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); - len += GPR_SLICE_LENGTH(message_pfx); - len += GPR_SLICE_LENGTH(*optional_message); + len += (gpr_uint32)GPR_SLICE_LENGTH(message_pfx); + len += (gpr_uint32)GPR_SLICE_LENGTH(*optional_message); } hdr = gpr_slice_malloc(9); p = GPR_SLICE_START_PTR(hdr); - *p++ = len >> 16; - *p++ = len >> 8; - *p++ = len; + *p++ = (gpr_uint8)(len >> 16); + *p++ = (gpr_uint8)(len >> 8); + *p++ = (gpr_uint8)(len); *p++ = GRPC_CHTTP2_FRAME_HEADER; *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS; - *p++ = stream_global->id >> 24; - *p++ = stream_global->id >> 16; - *p++ = stream_global->id >> 8; - *p++ = stream_global->id; + *p++ = (gpr_uint8)(stream_global->id >> 24); + *p++ = (gpr_uint8)(stream_global->id >> 16); + *p++ = (gpr_uint8)(stream_global->id >> 8); + *p++ = (gpr_uint8)(stream_global->id); GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr)); gpr_slice_buffer_add(&transport_global->qbuf, hdr); @@ -1120,7 +1125,7 @@ static int recv_data_loop(grpc_chttp2_transport *t, int *success) { grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); t->global.concurrent_stream_count = - grpc_chttp2_stream_map_size(&t->parsing_stream_map); + (gpr_uint32)grpc_chttp2_stream_map_size(&t->parsing_stream_map); if (t->parsing.initial_window_update != 0) { grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, update_global_window, t); @@ -1277,7 +1282,7 @@ grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, int is_client) { grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); - init_transport(t, channel_args, ep, mdctx, is_client); + init_transport(t, channel_args, ep, mdctx, is_client != 0); return &t->base; } diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 61638764a6..9d135f4356 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -186,7 +186,8 @@ grpc_mdctx *grpc_mdctx_create(void) { /* This seed is used to prevent remote connections from controlling hash table * collisions. It needs to be somewhat unpredictable to a remote connection. */ - return grpc_mdctx_create_with_seed(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); + return grpc_mdctx_create_with_seed( + (gpr_uint32)gpr_now(GPR_CLOCK_REALTIME).tv_nsec); } static void discard_metadata(grpc_mdctx *ctx) { @@ -333,7 +334,7 @@ grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str, grpc_mdstr *ret; for (i = 0; i < len; i++) { if (str[i] >= 'A' && str[i] <= 'Z') { - copy[i] = str[i] - 'A' + 'a'; + copy[i] = (char)(str[i] - 'A' + 'a'); } else { copy[i] = str[i]; } @@ -378,7 +379,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *buf, s->slice.refcount = NULL; memcpy(s->slice.data.inlined.bytes, buf, length); s->slice.data.inlined.bytes[length] = 0; - s->slice.data.inlined.length = length; + s->slice.data.inlined.length = (gpr_uint8)length; } else { /* string data goes after the internal_string header, and we +1 for null terminator */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 92c1f38c5e..6e1ec2f64c 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -75,7 +75,7 @@ typedef struct grpc_transport_stream_op { /** The number of bytes this peer is currently prepared to receive. These bytes will be eventually used to replenish per-stream flow control windows. */ - gpr_uint32 max_recv_bytes; + size_t max_recv_bytes; grpc_iomgr_closure *on_done_recv; grpc_pollset *bind_pollset; diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index 29127c4269..b1a975155a 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -100,7 +100,7 @@ static const char* tsi_fake_handshake_message_to_string(int msg) { static tsi_result tsi_fake_handshake_message_from_string( const char* msg_string, tsi_fake_handshake_message* msg) { - int i; + tsi_fake_handshake_message i; for (i = 0; i < TSI_FAKE_HANDSHAKE_MESSAGE_MAX; i++) { if (strncmp(msg_string, tsi_fake_handshake_message_strings[i], strlen(tsi_fake_handshake_message_strings[i])) == 0) { @@ -171,7 +171,7 @@ static tsi_result fill_frame_from_bytes(const unsigned char* incoming_bytes, memcpy(frame->data + frame->offset, bytes_cursor, available_size); bytes_cursor += available_size; frame->offset += available_size; - *incoming_bytes_size = bytes_cursor - incoming_bytes; + *incoming_bytes_size = (size_t)(bytes_cursor - incoming_bytes); return TSI_INCOMPLETE_DATA; } memcpy(frame->data + frame->offset, bytes_cursor, to_read_size); @@ -187,12 +187,12 @@ static tsi_result fill_frame_from_bytes(const unsigned char* incoming_bytes, memcpy(frame->data + frame->offset, bytes_cursor, available_size); frame->offset += available_size; bytes_cursor += available_size; - *incoming_bytes_size = bytes_cursor - incoming_bytes; + *incoming_bytes_size = (size_t)(bytes_cursor - incoming_bytes); return TSI_INCOMPLETE_DATA; } memcpy(frame->data + frame->offset, bytes_cursor, to_read_size); bytes_cursor += to_read_size; - *incoming_bytes_size = bytes_cursor - incoming_bytes; + *incoming_bytes_size = (size_t)(bytes_cursor - incoming_bytes); tsi_fake_frame_reset(frame, 1 /* needs_draining */); return TSI_OK; } @@ -219,7 +219,7 @@ static tsi_result bytes_to_frame(unsigned char* bytes, size_t bytes_size, frame->offset = 0; frame->size = bytes_size + TSI_FAKE_FRAME_HEADER_SIZE; if (!tsi_fake_frame_ensure_size(frame)) return TSI_OUT_OF_RESOURCES; - store32_little_endian(frame->size, frame->data); + store32_little_endian((gpr_uint32)frame->size, frame->data); memcpy(frame->data + TSI_FAKE_FRAME_HEADER_SIZE, bytes, bytes_size); tsi_fake_frame_reset(frame, 1 /* needs draining */); return TSI_OK; @@ -266,7 +266,7 @@ static tsi_result fake_protector_protect(tsi_frame_protector* self, if (frame->size == 0) { /* New frame, create a header. */ size_t written_in_frame_size = 0; - store32_little_endian(impl->max_frame_size, frame_header); + store32_little_endian((gpr_uint32)impl->max_frame_size, frame_header); written_in_frame_size = TSI_FAKE_FRAME_HEADER_SIZE; result = fill_frame_from_bytes(frame_header, &written_in_frame_size, frame); if (result != TSI_INCOMPLETE_DATA) { @@ -303,7 +303,8 @@ static tsi_result fake_protector_protect_flush( frame->size = frame->offset; frame->offset = 0; frame->needs_draining = 1; - store32_little_endian(frame->size, frame->data); /* Overwrite header. */ + store32_little_endian((gpr_uint32)frame->size, + frame->data); /* Overwrite header. */ } result = drain_frame_to_bytes(protected_output_frames, protected_output_frames_size, frame); @@ -384,7 +385,8 @@ static tsi_result fake_handshaker_get_bytes_to_send_to_peer( return TSI_OK; } if (!impl->outgoing.needs_draining) { - int next_message_to_send = impl->next_message_to_send + 2; + tsi_fake_handshake_message next_message_to_send = + impl->next_message_to_send + 2; const char* msg_string = tsi_fake_handshake_message_to_string(impl->next_message_to_send); result = bytes_to_frame((unsigned char*)msg_string, strlen(msg_string), diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 0b416f6c9d..99ce7ecadf 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -131,10 +131,13 @@ static unsigned long openssl_thread_id_cb(void) { static void init_openssl(void) { int i; + int num_locks; SSL_library_init(); SSL_load_error_strings(); OpenSSL_add_all_algorithms(); - openssl_mutexes = malloc(CRYPTO_num_locks() * sizeof(gpr_mu)); + num_locks = CRYPTO_num_locks(); + GPR_ASSERT(num_locks > 0); + openssl_mutexes = malloc((size_t)num_locks * sizeof(gpr_mu)); GPR_ASSERT(openssl_mutexes != NULL); for (i = 0; i < CRYPTO_num_locks(); i++) { gpr_mu_init(&openssl_mutexes[i]); @@ -249,7 +252,7 @@ static tsi_result ssl_get_x509_common_name(X509* cert, unsigned char** utf8, gpr_log(GPR_ERROR, "Could not extract utf8 from asn1 string."); return TSI_OUT_OF_RESOURCES; } - *utf8_size = utf8_returned_size; + *utf8_size = (size_t)utf8_returned_size; return TSI_OK; } @@ -279,8 +282,8 @@ static tsi_result peer_property_from_x509_common_name( /* Gets the subject SANs from an X509 cert as a tsi_peer_property. */ static tsi_result add_subject_alt_names_properties_to_peer( tsi_peer* peer, GENERAL_NAMES* subject_alt_names, - int subject_alt_name_count) { - int i; + size_t subject_alt_name_count) { + size_t i; tsi_result result = TSI_OK; /* Reset for DNS entries filtering. */ @@ -288,7 +291,7 @@ static tsi_result add_subject_alt_names_properties_to_peer( for (i = 0; i < subject_alt_name_count; i++) { GENERAL_NAME* subject_alt_name = - sk_GENERAL_NAME_value(subject_alt_names, i); + sk_GENERAL_NAME_value(subject_alt_names, (int)i); /* Filter out the non-dns entries names. */ if (subject_alt_name->type == GEN_DNS) { unsigned char* dns_name = NULL; @@ -301,7 +304,7 @@ static tsi_result add_subject_alt_names_properties_to_peer( } result = tsi_construct_string_peer_property( TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, - (const char*)dns_name, dns_name_size, + (const char*)dns_name, (size_t)dns_name_size, &peer->properties[peer->property_count++]); OPENSSL_free(dns_name); if (result != TSI_OK) break; @@ -318,9 +321,12 @@ static tsi_result peer_from_x509(X509* cert, int include_certificate_type, X509_get_ext_d2i(cert, NID_subject_alt_name, 0, 0); int subject_alt_name_count = (subject_alt_names != NULL) ? sk_GENERAL_NAME_num(subject_alt_names) : 0; - size_t property_count = (include_certificate_type ? 1 : 0) + - 1 /* common name */ + subject_alt_name_count; - tsi_result result = tsi_construct_peer(property_count, peer); + size_t property_count; + tsi_result result; + GPR_ASSERT(subject_alt_name_count >= 0); + property_count = (include_certificate_type ? (size_t)1 : 0) + + 1 /* common name */ + (size_t)subject_alt_name_count; + result = tsi_construct_peer(property_count, peer); if (result != TSI_OK) return result; do { if (include_certificate_type) { @@ -334,8 +340,8 @@ static tsi_result peer_from_x509(X509* cert, int include_certificate_type, if (result != TSI_OK) break; if (subject_alt_name_count != 0) { - result = add_subject_alt_names_properties_to_peer(peer, subject_alt_names, - subject_alt_name_count); + result = add_subject_alt_names_properties_to_peer( + peer, subject_alt_names, (size_t)subject_alt_name_count); if (result != TSI_OK) break; } } while (0); @@ -360,7 +366,10 @@ static void log_ssl_error_stack(void) { /* Performs an SSL_read and handle errors. */ static tsi_result do_ssl_read(SSL* ssl, unsigned char* unprotected_bytes, size_t* unprotected_bytes_size) { - int read_from_ssl = SSL_read(ssl, unprotected_bytes, *unprotected_bytes_size); + int read_from_ssl; + GPR_ASSERT(*unprotected_bytes_size <= INT_MAX); + read_from_ssl = + SSL_read(ssl, unprotected_bytes, (int)*unprotected_bytes_size); if (read_from_ssl == 0) { gpr_log(GPR_ERROR, "SSL_read returned 0 unexpectedly."); return TSI_INTERNAL_ERROR; @@ -387,15 +396,17 @@ static tsi_result do_ssl_read(SSL* ssl, unsigned char* unprotected_bytes, return TSI_PROTOCOL_FAILURE; } } - *unprotected_bytes_size = read_from_ssl; + *unprotected_bytes_size = (size_t)read_from_ssl; return TSI_OK; } /* Performs an SSL_write and handle errors. */ static tsi_result do_ssl_write(SSL* ssl, unsigned char* unprotected_bytes, size_t unprotected_bytes_size) { - int ssl_write_result = - SSL_write(ssl, unprotected_bytes, unprotected_bytes_size); + int ssl_write_result; + GPR_ASSERT(unprotected_bytes_size <= INT_MAX); + ssl_write_result = + SSL_write(ssl, unprotected_bytes, (int)unprotected_bytes_size); if (ssl_write_result < 0) { ssl_write_result = SSL_get_error(ssl, ssl_write_result); if (ssl_write_result == SSL_ERROR_WANT_READ) { @@ -417,7 +428,9 @@ static tsi_result ssl_ctx_use_certificate_chain( size_t pem_cert_chain_size) { tsi_result result = TSI_OK; X509* certificate = NULL; - BIO* pem = BIO_new_mem_buf((void*)pem_cert_chain, pem_cert_chain_size); + BIO* pem; + GPR_ASSERT(pem_cert_chain_size <= INT_MAX); + pem = BIO_new_mem_buf((void*)pem_cert_chain, (int)pem_cert_chain_size); if (pem == NULL) return TSI_OUT_OF_RESOURCES; do { @@ -458,7 +471,9 @@ static tsi_result ssl_ctx_use_private_key(SSL_CTX* context, size_t pem_key_size) { tsi_result result = TSI_OK; EVP_PKEY* private_key = NULL; - BIO* pem = BIO_new_mem_buf((void*)pem_key, pem_key_size); + BIO* pem; + GPR_ASSERT(pem_key_size <= INT_MAX); + pem = BIO_new_mem_buf((void*)pem_key, (int)pem_key_size); if (pem == NULL) return TSI_OUT_OF_RESOURCES; do { private_key = PEM_read_bio_PrivateKey(pem, NULL, NULL, ""); @@ -485,8 +500,11 @@ static tsi_result ssl_ctx_load_verification_certs( size_t num_roots = 0; X509* root = NULL; X509_NAME* root_name = NULL; - BIO* pem = BIO_new_mem_buf((void*)pem_roots, pem_roots_size); - X509_STORE* root_store = SSL_CTX_get_cert_store(context); + BIO* pem; + X509_STORE* root_store; + GPR_ASSERT(pem_roots_size <= INT_MAX); + pem = BIO_new_mem_buf((void*)pem_roots, (int)pem_roots_size); + root_store = SSL_CTX_get_cert_store(context); if (root_store == NULL) return TSI_INVALID_ARGUMENT; if (pem == NULL) return TSI_OUT_OF_RESOURCES; if (root_names != NULL) { @@ -586,7 +604,9 @@ static tsi_result extract_x509_subject_names_from_pem_cert( const unsigned char* pem_cert, size_t pem_cert_size, tsi_peer* peer) { tsi_result result = TSI_OK; X509* cert = NULL; - BIO* pem = BIO_new_mem_buf((void*)pem_cert, pem_cert_size); + BIO* pem; + GPR_ASSERT(pem_cert_size <= INT_MAX); + pem = BIO_new_mem_buf((void*)pem_cert, (int)pem_cert_size); if (pem == NULL) return TSI_OUT_OF_RESOURCES; cert = PEM_read_bio_X509(pem, NULL, NULL, ""); @@ -616,7 +636,7 @@ static tsi_result build_alpn_protocol_name_list( gpr_log(GPR_ERROR, "Invalid 0-length protocol name."); return TSI_INVALID_ARGUMENT; } - *protocol_name_list_length += alpn_protocols_lengths[i] + 1; + *protocol_name_list_length += (size_t)alpn_protocols_lengths[i] + 1; } *protocol_name_list = malloc(*protocol_name_list_length); if (*protocol_name_list == NULL) return TSI_OUT_OF_RESOURCES; @@ -648,17 +668,18 @@ static tsi_result ssl_protector_protect(tsi_frame_protector* self, tsi_result result = TSI_OK; /* First see if we have some pending data in the SSL BIO. */ - size_t pending_in_ssl = BIO_pending(impl->from_ssl); + int pending_in_ssl = BIO_pending(impl->from_ssl); if (pending_in_ssl > 0) { *unprotected_bytes_size = 0; + GPR_ASSERT(*protected_output_frames_size <= INT_MAX); read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames, - *protected_output_frames_size); + (int)*protected_output_frames_size); if (read_from_ssl < 0) { gpr_log(GPR_ERROR, "Could not read from BIO even though some data is pending"); return TSI_INTERNAL_ERROR; } - *protected_output_frames_size = read_from_ssl; + *protected_output_frames_size = (size_t)read_from_ssl; return TSI_OK; } @@ -678,13 +699,14 @@ static tsi_result ssl_protector_protect(tsi_frame_protector* self, result = do_ssl_write(impl->ssl, impl->buffer, impl->buffer_size); if (result != TSI_OK) return result; + GPR_ASSERT(*protected_output_frames_size <= INT_MAX); read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames, - *protected_output_frames_size); + (int)*protected_output_frames_size); if (read_from_ssl < 0) { gpr_log(GPR_ERROR, "Could not read from BIO after SSL_write."); return TSI_INTERNAL_ERROR; } - *protected_output_frames_size = read_from_ssl; + *protected_output_frames_size = (size_t)read_from_ssl; *unprotected_bytes_size = available; impl->buffer_offset = 0; return TSI_OK; @@ -696,6 +718,7 @@ static tsi_result ssl_protector_protect_flush( tsi_result result = TSI_OK; tsi_ssl_frame_protector* impl = (tsi_ssl_frame_protector*)self; int read_from_ssl = 0; + int pending; if (impl->buffer_offset != 0) { result = do_ssl_write(impl->ssl, impl->buffer, impl->buffer_offset); @@ -703,17 +726,22 @@ static tsi_result ssl_protector_protect_flush( impl->buffer_offset = 0; } - *still_pending_size = BIO_pending(impl->from_ssl); + pending = BIO_pending(impl->from_ssl); + GPR_ASSERT(pending >= 0); + *still_pending_size = (size_t)pending; if (*still_pending_size == 0) return TSI_OK; + GPR_ASSERT(*protected_output_frames_size <= INT_MAX); read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames, - *protected_output_frames_size); + (int)*protected_output_frames_size); if (read_from_ssl <= 0) { gpr_log(GPR_ERROR, "Could not read from BIO after SSL_write."); return TSI_INTERNAL_ERROR; } - *protected_output_frames_size = read_from_ssl; - *still_pending_size = BIO_pending(impl->from_ssl); + *protected_output_frames_size = (size_t)read_from_ssl; + pending = BIO_pending(impl->from_ssl); + GPR_ASSERT(pending >= 0); + *still_pending_size = (size_t)pending; return TSI_OK; } @@ -740,14 +768,15 @@ static tsi_result ssl_protector_unprotect( *unprotected_bytes_size = output_bytes_size - output_bytes_offset; /* Then, try to write some data to ssl. */ + GPR_ASSERT(*protected_frames_bytes_size <= INT_MAX); written_into_ssl = BIO_write(impl->into_ssl, protected_frames_bytes, - *protected_frames_bytes_size); + (int)*protected_frames_bytes_size); if (written_into_ssl < 0) { gpr_log(GPR_ERROR, "Sending protected frame to ssl failed with %d", written_into_ssl); return TSI_INTERNAL_ERROR; } - *protected_frames_bytes_size = written_into_ssl; + *protected_frames_bytes_size = (size_t)written_into_ssl; /* Now try to read some data again. */ result = do_ssl_read(impl->ssl, unprotected_bytes, unprotected_bytes_size); @@ -781,7 +810,8 @@ static tsi_result ssl_handshaker_get_bytes_to_send_to_peer(tsi_handshaker* self, *bytes_size > INT_MAX) { return TSI_INVALID_ARGUMENT; } - bytes_read_from_ssl = BIO_read(impl->from_ssl, bytes, *bytes_size); + GPR_ASSERT(*bytes_size <= INT_MAX); + bytes_read_from_ssl = BIO_read(impl->from_ssl, bytes, (int)*bytes_size); if (bytes_read_from_ssl < 0) { *bytes_size = 0; if (!BIO_should_retry(impl->from_ssl)) { @@ -811,13 +841,15 @@ static tsi_result ssl_handshaker_process_bytes_from_peer( if (bytes == NULL || bytes_size == 0 || *bytes_size > INT_MAX) { return TSI_INVALID_ARGUMENT; } - bytes_written_into_ssl_size = BIO_write(impl->into_ssl, bytes, *bytes_size); + GPR_ASSERT(*bytes_size <= INT_MAX); + bytes_written_into_ssl_size = + BIO_write(impl->into_ssl, bytes, (int)*bytes_size); if (bytes_written_into_ssl_size < 0) { gpr_log(GPR_ERROR, "Could not write to memory BIO."); impl->result = TSI_INTERNAL_ERROR; return impl->result; } - *bytes_size = bytes_written_into_ssl_size; + *bytes_size = (size_t)bytes_written_into_ssl_size; if (!tsi_handshaker_is_in_progress(self)) { impl->result = TSI_OK; @@ -1033,9 +1065,9 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client, static int select_protocol_list(const unsigned char** out, unsigned char* outlen, const unsigned char* client_list, - unsigned int client_list_len, + size_t client_list_len, const unsigned char* server_list, - unsigned int server_list_len) { + size_t server_list_len) { const unsigned char* client_current = client_list; while ((unsigned int)(client_current - client_list) < client_list_len) { unsigned char client_current_len = *(client_current++); @@ -1208,7 +1240,8 @@ static int server_handshaker_factory_npn_advertised_callback( tsi_ssl_server_handshaker_factory* factory = (tsi_ssl_server_handshaker_factory*)arg; *out = factory->alpn_protocol_list; - *outlen = factory->alpn_protocol_list_length; + GPR_ASSERT(factory->alpn_protocol_list_length <= UINT_MAX); + *outlen = (unsigned int)factory->alpn_protocol_list_length; return SSL_TLSEXT_ERR_OK; } @@ -1266,8 +1299,10 @@ tsi_result tsi_create_ssl_client_handshaker_factory( break; } #if TSI_OPENSSL_ALPN_SUPPORT - if (SSL_CTX_set_alpn_protos(ssl_context, impl->alpn_protocol_list, - impl->alpn_protocol_list_length)) { + GPR_ASSERT(impl->alpn_protocol_list_length < UINT_MAX); + if (SSL_CTX_set_alpn_protos( + ssl_context, impl->alpn_protocol_list, + (unsigned int)impl->alpn_protocol_list_length)) { gpr_log(GPR_ERROR, "Could not set alpn protocol list to context."); result = TSI_INVALID_ARGUMENT; break; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index d67205e822..a3020c342b 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -252,28 +252,36 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -static grpc_server* CreateServer(int max_message_size) { +static grpc_server* CreateServer( + int max_message_size, const grpc_compression_options& compression_options) { + grpc_arg args[2]; + size_t args_idx = 0; if (max_message_size > 0) { - grpc_arg arg; - arg.type = GRPC_ARG_INTEGER; - arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); - arg.value.integer = max_message_size; - grpc_channel_args args = {1, &arg}; - return grpc_server_create(&args, nullptr); - } else { - return grpc_server_create(nullptr, nullptr); + args[args_idx].type = GRPC_ARG_INTEGER; + args[args_idx].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); + args[args_idx].value.integer = max_message_size; + args_idx++; } + + args[args_idx].type = GRPC_ARG_INTEGER; + args[args_idx].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG); + args[args_idx].value.integer = compression_options.enabled_algorithms_bitset; + args_idx++; + + grpc_channel_args channel_args = {args_idx, args}; + return grpc_server_create(&channel_args, nullptr); } Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_message_size) + int max_message_size, + grpc_compression_options compression_options) : max_message_size_(max_message_size), started_(false), shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), has_generic_service_(false), - server_(CreateServer(max_message_size)), + server_(CreateServer(max_message_size, compression_options)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 99bc8147a0..1c7e4e4eb6 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -43,7 +43,9 @@ namespace grpc { ServerBuilder::ServerBuilder() - : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} + : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) { + grpc_compression_options_init(&compression_options_); +} std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { ServerCompletionQueue* cq = new ServerCompletionQueue(); @@ -99,8 +101,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } - std::unique_ptr<Server> server( - new Server(thread_pool_, thread_pool_owned, max_message_size_)); + std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned, + max_message_size_, + compression_options_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { grpc_server_register_completion_queue(server->server_, (*cq)->cq(), nullptr); diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 70c0fbcc50..51e0728fb9 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -252,7 +252,7 @@ GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length( if (!ctx->recv_message) { return -1; } - return grpc_byte_buffer_length(ctx->recv_message); + return (gpr_intptr)grpc_byte_buffer_length(ctx->recv_message); } /* diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 89c0fbf0f3..8a2f2d6283 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -64,7 +64,7 @@ class SphinxDocumentation(setuptools.Command): import sphinx.apidoc metadata = self.distribution.metadata src_dir = os.path.join( - os.getcwd(), self.distribution.package_dir['grpc']) + os.getcwd(), self.distribution.package_dir[''], 'grpc') sys.path.append(src_dir) sphinx.apidoc.main([ '', '--force', '--full', '-H', metadata.name, '-A', metadata.author, diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py index daa42c475a..05b954d186 100644 --- a/src/python/grpcio/grpc/beta/_server.py +++ b/src/python/grpcio/grpc/beta/_server.py @@ -86,13 +86,13 @@ class Server(interfaces.Server): return self._grpc_link.add_port( address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access - def start(self): + def _start(self): self._grpc_link.join_link(self._end_link) self._end_link.join_link(self._grpc_link) self._grpc_link.start() self._end_link.start() - def stop(self, grace): + def _stop(self, grace): stop_event = threading.Event() if 0 < grace: disassembly_thread = threading.Thread( @@ -105,6 +105,20 @@ class Server(interfaces.Server): _disassemble(self._grpc_link, self._end_link, self._pool, stop_event, 0) return stop_event + def start(self): + self._start() + + def stop(self, grace): + return self._stop(grace) + + def __enter__(self): + self._start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._stop(0).wait() + return False + def server( implementations, multi_implementation, request_deserializers, diff --git a/src/python/grpcio/grpc/beta/_stub.py b/src/python/grpcio/grpc/beta/_stub.py index cfbecb852b..11dab889cd 100644 --- a/src/python/grpcio/grpc/beta/_stub.py +++ b/src/python/grpcio/grpc/beta/_stub.py @@ -49,6 +49,12 @@ class _AutoIntermediary(object): def __getattr__(self, attr): return getattr(self._delegate, attr) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + def __del__(self): self._on_deletion() diff --git a/src/python/grpcio/requirements.txt b/src/python/grpcio/requirements.txt index 43395df03b..608ba402e0 100644 --- a/src/python/grpcio/requirements.txt +++ b/src/python/grpcio/requirements.txt @@ -1,3 +1,2 @@ enum34==1.0.4 futures==2.2.0 -protobuf==3.0.0a3 diff --git a/src/python/grpcio/setup.py b/src/python/grpcio/setup.py index 4735ce080b..151b2bfcb4 100644 --- a/src/python/grpcio/setup.py +++ b/src/python/grpcio/setup.py @@ -104,7 +104,7 @@ _COMMAND_CLASS = { setuptools.setup( name='grpcio', - version='0.11.0', + version='0.11.0b0', ext_modules=_EXTENSION_MODULES, packages=list(_PACKAGES), package_dir=_PACKAGE_DIRECTORIES, diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index fcde0dab8c..35253ba312 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -51,7 +51,7 @@ _PACKAGE_DIRECTORIES = { } _INSTALL_REQUIRES = ( - 'grpcio>=0.10.0a0', + 'grpcio>=0.11.0b0', ) _SETUP_REQUIRES = _INSTALL_REQUIRES @@ -63,7 +63,7 @@ _COMMAND_CLASS = { setuptools.setup( name='grpcio_health_checking', - version='0.10.0a0', + version='0.11.0b0', packages=list(_PACKAGES), package_dir=_PACKAGE_DIRECTORIES, install_requires=_INSTALL_REQUIRES, diff --git a/src/python/grpcio_test/requirements.txt b/src/python/grpcio_test/requirements.txt index 856198def5..fea80ca07f 100644 --- a/src/python/grpcio_test/requirements.txt +++ b/src/python/grpcio_test/requirements.txt @@ -1,5 +1,6 @@ +grpcio>=0.11.0b0 +oauth2client>=1.4.7 +protobuf>=3.0.0a3 pytest>=2.6 pytest-cov>=2.0 pytest-xdist>=1.11 -oauth2client>=1.4.7 -grpcio>=0.10.0a0 diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py index 802dd1e53a..216119f0e7 100644 --- a/src/python/grpcio_test/setup.py +++ b/src/python/grpcio_test/setup.py @@ -71,7 +71,7 @@ _SETUP_REQUIRES = ( _INSTALL_REQUIRES = ( 'oauth2client>=1.4.7', - 'grpcio>=0.10.0a0', + 'grpcio>=0.11.0b0', ) _COMMAND_CLASS = { @@ -80,7 +80,7 @@ _COMMAND_CLASS = { setuptools.setup( name='grpcio_test', - version='0.10.0a0', + version='0.11.0b0', packages=_PACKAGES, package_dir=_PACKAGE_DIRECTORIES, package_data=_PACKAGE_DATA, |