diff options
Diffstat (limited to 'src/core')
159 files changed, 1995 insertions, 566 deletions
diff --git a/src/core/channel/census_filter.h b/src/core/channel/census_filter.h index 4f9759f0db..1453c05d28 100644 --- a/src/core/channel/census_filter.h +++ b/src/core/channel/census_filter.h @@ -41,4 +41,4 @@ extern const grpc_channel_filter grpc_client_census_filter; extern const grpc_channel_filter grpc_server_census_filter; -#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */ diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index c430b56fa2..54ee75af28 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> +#include <grpc/support/useful.h> #include <string.h> @@ -146,3 +147,65 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm( tmp.value.integer = algorithm; return grpc_channel_args_copy_and_add(a, &tmp, 1); } + +/** Returns 1 if the argument for compression algorithm's enabled states bitset + * was found in \a a, returning the arg's value in \a states. Otherwise, returns + * 0. */ +static int find_compression_algorithm_states_bitset( + const grpc_channel_args *a, int **states_arg) { + if (a != NULL) { + size_t i; + for (i = 0; i < a->num_args; ++i) { + if (a->args[i].type == GRPC_ARG_INTEGER && + !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) { + *states_arg = &a->args[i].value.integer; + return 1; /* GPR_TRUE */ + } + } + } + return 0; /* GPR_FALSE */ +} + +grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( + grpc_channel_args **a, + grpc_compression_algorithm algorithm, + int state) { + int *states_arg; + grpc_channel_args *result = *a; + const int states_arg_found = + find_compression_algorithm_states_bitset(*a, &states_arg); + + if (states_arg_found) { + if (state != 0) { + GPR_BITSET(states_arg, algorithm); + } else { + GPR_BITCLEAR(states_arg, algorithm); + } + } else { + /* create a new arg */ + grpc_arg tmp; + tmp.type = GRPC_ARG_INTEGER; + tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; + /* all enabled by default */ + tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; + if (state != 0) { + GPR_BITSET(&tmp.value.integer, algorithm); + } else { + GPR_BITCLEAR(&tmp.value.integer, algorithm); + } + result = grpc_channel_args_copy_and_add(*a, &tmp, 1); + grpc_channel_args_destroy(*a); + *a = result; + } + return result; +} + +int grpc_channel_args_compression_algorithm_get_states( + const grpc_channel_args *a) { + int *states_arg; + if (find_compression_algorithm_states_bitset(a, &states_arg)) { + return *states_arg; + } else { + return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ + } +} diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index 7e6ddd3997..06a6012dee 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -67,4 +67,24 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( grpc_channel_args *grpc_channel_args_set_compression_algorithm( grpc_channel_args *a, grpc_compression_algorithm algorithm); +/** Sets the support for the given compression algorithm. By default, all + * compression algorithms are enabled. It's an error to disable an algorithm set + * by grpc_channel_args_set_compression_algorithm. + * + * Returns an instance will the updated algorithm states. The \a a pointer is + * modified to point to the returned instance (which may be different from the + * input value of \a a). */ +grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( + grpc_channel_args **a, + grpc_compression_algorithm algorithm, + int enabled); + +/** Returns the bitset representing the support state (true for enabled, false + * for disabled) for compression algorithms. + * + * The i-th bit of the returned bitset corresponds to the i-th entry in the + * grpc_compression_algorithm enum. */ +int grpc_channel_args_compression_algorithm_get_states( + const grpc_channel_args *a); + #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 6c2e6b38a8..2e25033813 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -84,8 +84,10 @@ typedef struct { grpc_pollset_set pollset_set; } channel_data; -/** We create one watcher for each new lb_policy that is returned from a resolver, - to watch for state changes from the lb_policy. When a state change is seen, we +/** We create one watcher for each new lb_policy that is returned from a + resolver, + to watch for state changes from the lb_policy. When a state change is seen, + we update the channel, and create a new watcher */ typedef struct { channel_data *chand; @@ -380,7 +382,8 @@ static void perform_transport_stream_op(grpc_call_element *elem, if (lb_policy) { grpc_transport_stream_op *op = &calld->waiting_op; grpc_pollset *bind_pollset = op->bind_pollset; - grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata; + grpc_metadata_batch *initial_metadata = + &op->send_ops->ops[0].data.metadata; GRPC_LB_POLICY_REF(lb_policy, "pick"); gpr_mu_unlock(&chand->mu_config); calld->state = CALL_WAITING_FOR_PICK; @@ -388,13 +391,14 @@ static void perform_transport_stream_op(grpc_call_element *elem, GPR_ASSERT(op->bind_pollset); GPR_ASSERT(op->send_ops); GPR_ASSERT(op->send_ops->nops >= 1); - GPR_ASSERT( - op->send_ops->ops[0].type == GRPC_OP_METADATA); + GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld); + grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, + calld); grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, - &calld->picked_channel, &calld->async_setup_task); + &calld->picked_channel, + &calld->async_setup_task); GRPC_LB_POLICY_UNREF(lb_policy, "pick"); } else if (chand->resolver != NULL) { @@ -430,7 +434,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, perform_transport_stream_op(elem, op, 0); } -static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state); +static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state); static void on_lb_policy_state_changed(void *arg, int iomgr_success) { lb_policy_connectivity_watcher *w = arg; @@ -450,7 +455,8 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) { gpr_free(w); } -static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) { +static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); @@ -499,13 +505,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (iomgr_success && chand->resolver) { grpc_resolver *resolver = chand->resolver; GRPC_RESOLVER_REF(resolver, "channel-next"); + grpc_connectivity_state_set(&chand->state_tracker, state, + "new_lb+resolver"); gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(resolver, "channel-next"); - grpc_connectivity_state_set(&chand->state_tracker, state, - "new_lb+resolver"); if (lb_policy != NULL) { watch_lb_policy(chand, lb_policy, state); } @@ -663,7 +669,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, + "client_channel"); } /* Destructor for channel_data */ @@ -747,19 +754,20 @@ void grpc_client_channel_watch_connectivity_state( gpr_mu_unlock(&chand->mu_config); } -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) { +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( + grpc_channel_element *elem) { channel_data *chand = elem->channel_data; return &chand->pollset_set; } void grpc_client_channel_add_interested_party(grpc_channel_element *elem, - grpc_pollset *pollset) { + grpc_pollset *pollset) { channel_data *chand = elem->channel_data; grpc_pollset_set_add_pollset(&chand->pollset_set, pollset); } void grpc_client_channel_del_interested_party(grpc_channel_element *elem, - grpc_pollset *pollset) { + grpc_pollset *pollset) { channel_data *chand = elem->channel_data; grpc_pollset_set_del_pollset(&chand->pollset_set, pollset); } diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index cd81294eb3..13681e3956 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -59,11 +59,12 @@ void grpc_client_channel_watch_connectivity_state( grpc_channel_element *elem, grpc_connectivity_state *state, grpc_iomgr_closure *on_complete); -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem); +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( + grpc_channel_element *elem); void grpc_client_channel_add_interested_party(grpc_channel_element *channel, - grpc_pollset *pollset); + grpc_pollset *pollset); void grpc_client_channel_del_interested_party(grpc_channel_element *channel, - grpc_pollset *pollset); + grpc_pollset *pollset); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 8963c13b0f..762a4edc73 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -35,22 +35,25 @@ #include <string.h> #include <grpc/compression.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> #include "src/core/channel/compress_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/compression/message_compress.h" +#include "src/core/support/string.h" typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; + grpc_linked_mdelem accept_encoding_storage; int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */ int written_initial_metadata; /**< Already processed initial md? */ /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; - /** If true, contents of \a compression_algorithm are authoritative */ + /** If true, contents of \a compression_algorithm are authoritative */ int has_compression_algorithm; } call_data; @@ -59,8 +62,12 @@ typedef struct channel_data { grpc_mdstr *mdstr_request_compression_algorithm_key; /** Metadata key for the outgoing (used) compression algorithm */ grpc_mdstr *mdstr_outgoing_compression_algorithm_key; + /** Metadata key for the accepted encodings */ + grpc_mdstr *mdstr_compression_capabilities_key; /** Precomputed metadata elements for all available compression algorithms */ grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT]; + /** Precomputed metadata elements for the accepted encodings */ + grpc_mdelem *mdelem_accept_encoding; /** The default, channel-level, compression algorithm */ grpc_compression_algorithm default_compression_algorithm; } channel_data; @@ -71,7 +78,7 @@ typedef struct channel_data { * * Returns 1 if the data was actually compress and 0 otherwise. */ static int compress_send_sb(grpc_compression_algorithm algorithm, - gpr_slice_buffer *slices) { + gpr_slice_buffer *slices) { int did_compress; gpr_slice_buffer tmp; gpr_slice_buffer_init(&tmp); @@ -86,14 +93,14 @@ static int compress_send_sb(grpc_compression_algorithm algorithm, /** For each \a md element from the incoming metadata, filter out the entry for * "grpc-encoding", using its value to populate the call data's * compression_algorithm field. */ -static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; if (md->key == channeld->mdstr_request_compression_algorithm_key) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - if (!grpc_compression_algorithm_parse(md_c_str, + if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), &calld->compression_algorithm)) { gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", md_c_str); @@ -108,10 +115,10 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { static int skip_compression(channel_data *channeld, call_data *calld) { if (calld->has_compression_algorithm) { - if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { - return 1; - } - return 0; /* we have an actual call-specific algorithm */ + if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { + return 1; + } + return 0; /* we have an actual call-specific algorithm */ } /* no per-call compression override */ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; @@ -184,7 +191,7 @@ static void process_send_ops(grpc_call_element *elem, * given by GRPC_OP_BEGIN_MESSAGE) */ calld->remaining_slice_bytes = sop->data.begin_message.length; if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) { - calld->has_compression_algorithm = 1; /* GPR_TRUE */ + calld->has_compression_algorithm = 1; /* GPR_TRUE */ calld->compression_algorithm = GRPC_COMPRESS_NONE; } break; @@ -202,10 +209,17 @@ static void process_send_ops(grpc_call_element *elem, channeld->default_compression_algorithm; calld->has_compression_algorithm = 1; /* GPR_TRUE */ } + /* hint compression algorithm */ grpc_metadata_batch_add_tail( &(sop->data.metadata), &calld->compression_algorithm_storage, GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms [calld->compression_algorithm])); + + /* convey supported compression algorithms */ + grpc_metadata_batch_add_tail( + &(sop->data.metadata), &calld->accept_encoding_storage, + GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); + calld->written_initial_metadata = 1; /* GPR_TRUE */ } break; @@ -279,6 +293,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, int is_first, int is_last) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; + const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; + char *accept_encoding_str; + size_t accept_encoding_str_len; channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args); @@ -289,6 +306,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channeld->mdstr_outgoing_compression_algorithm_key = grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); + channeld->mdstr_compression_capabilities_key = + grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0); + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { char *algorithm_name; GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); @@ -297,8 +317,22 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, mdctx, GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorithm_name, 0)); + if (algo_idx > 0) { + supported_algorithms_names[algo_idx - 1] = algorithm_name; + } } + /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated + * arrays, as to avoid the heap allocs */ + accept_encoding_str = gpr_strjoin_sep( + supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names), + ", ", &accept_encoding_str_len); + + channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( + mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), + grpc_mdstr_from_string(mdctx, accept_encoding_str, 0)); + gpr_free(accept_encoding_str); + GPR_ASSERT(!is_last); } @@ -309,10 +343,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key); GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key); - for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; - ++algo_idx) { + GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key); + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]); } + GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding); } const grpc_channel_filter grpc_compress_filter = { diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h index 0694e2c1dd..0917e81ca4 100644 --- a/src/core/channel/compress_filter.h +++ b/src/core/channel/compress_filter.h @@ -62,4 +62,4 @@ extern const grpc_channel_filter grpc_compress_filter; -#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */ diff --git a/src/core/channel/http_client_filter.h b/src/core/channel/http_client_filter.h index 04eb839e00..21c66b9b8e 100644 --- a/src/core/channel/http_client_filter.h +++ b/src/core/channel/http_client_filter.h @@ -41,4 +41,4 @@ extern const grpc_channel_filter grpc_http_client_filter; #define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme" -#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */ diff --git a/src/core/channel/http_server_filter.h b/src/core/channel/http_server_filter.h index 42f76ed17f..f219d4e66f 100644 --- a/src/core/channel/http_server_filter.h +++ b/src/core/channel/http_server_filter.h @@ -39,4 +39,4 @@ /* Processes metadata on the client side for HTTP2 transports */ extern const grpc_channel_filter grpc_http_server_filter; -#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */ diff --git a/src/core/channel/noop_filter.h b/src/core/channel/noop_filter.h index 96463e5322..ded9b33117 100644 --- a/src/core/channel/noop_filter.h +++ b/src/core/channel/noop_filter.h @@ -41,4 +41,4 @@ customize for their own filters */ extern const grpc_channel_filter grpc_no_op_filter; -#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 827b1a2be5..7b35b7902f 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -219,7 +219,8 @@ static grpc_resolver *dns_create( default_host_arg.type = GRPC_ARG_STRING; default_host_arg.key = GRPC_ARG_DEFAULT_AUTHORITY; default_host_arg.value.string = host; - subchannel_factory = grpc_subchannel_factory_add_channel_arg(subchannel_factory, &default_host_arg); + subchannel_factory = grpc_subchannel_factory_add_channel_arg( + subchannel_factory, &default_host_arg); gpr_free(host); gpr_free(port); diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 74584e7e2c..8419873908 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -60,9 +60,12 @@ typedef struct { grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, size_t num_subchannels); - /** the address that we've 'resolved' */ - struct sockaddr_storage addr; - int addr_len; + /** the addresses that we've 'resolved' */ + struct sockaddr_storage *addrs; + /** the corresponding length of the addresses */ + int *addrs_len; + /** how many elements in \a addrs */ + size_t num_addrs; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -119,17 +122,22 @@ static void sockaddr_next(grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; - grpc_subchannel *subchannel; + grpc_subchannel **subchannels; grpc_subchannel_args args; if (r->next_completion != NULL && !r->published) { + size_t i; cfg = grpc_client_config_create(); - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)&r->addr; - args.addr_len = r->addr_len; - subchannel = - grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args); - lb_policy = r->lb_policy_factory(&subchannel, 1); + subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs); + for (i = 0; i < r->num_addrs; i++) { + memset(&args, 0, sizeof(args)); + args.addr = (struct sockaddr *)&r->addrs[i]; + args.addr_len = r->addrs_len[i]; + subchannels[i] = grpc_subchannel_factory_create_subchannel( + r->subchannel_factory, &args); + } + lb_policy = r->lb_policy_factory(subchannels, r->num_addrs); + gpr_free(subchannels); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "unix"); r->published = 1; @@ -143,6 +151,8 @@ static void sockaddr_destroy(grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(r->subchannel_factory); + gpr_free(r->addrs); + gpr_free(r->addrs_len); gpr_free(r); } @@ -238,13 +248,18 @@ done: return result; } +static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( grpc_uri *uri, grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, size_t num_subchannels), grpc_subchannel_factory *subchannel_factory, int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) { + size_t i; + int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; + gpr_slice path_slice; + gpr_slice_buffer path_parts; if (0 != strcmp(uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported"); @@ -253,7 +268,29 @@ static grpc_resolver *sockaddr_create( r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); - if (!parse(uri, &r->addr, &r->addr_len)) { + + path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing); + gpr_slice_buffer_init(&path_parts); + + gpr_slice_split(path_slice, ",", &path_parts); + r->num_addrs = path_parts.count; + r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); + r->addrs_len = gpr_malloc(sizeof(int) * r->num_addrs); + + for(i = 0; i < r->num_addrs; i++) { + grpc_uri ith_uri = *uri; + char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); + ith_uri.path = part_str; + if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { + errors_found = 1; /* GPR_TRUE */ + } + gpr_free(part_str); + if (errors_found) break; + } + + gpr_slice_buffer_destroy(&path_parts); + gpr_slice_unref(path_slice); + if (errors_found) { gpr_free(r); return NULL; } diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c new file mode 100644 index 0000000000..acb2ba136e --- /dev/null +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -0,0 +1,501 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/resolvers/zookeeper_resolver.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include <grpc/grpc_zookeeper.h> +#include <zookeeper/zookeeper.h> + +#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/resolver_registry.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/support/string.h" +#include "src/core/json/json.h" + +/** Zookeeper session expiration time in milliseconds */ +#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000 + +typedef struct { + /** base class: must be first */ + grpc_resolver base; + /** refcount */ + gpr_refcount refs; + /** name to resolve */ + char *name; + /** subchannel factory */ + grpc_subchannel_factory *subchannel_factory; + /** load balancing policy factory */ + grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, + size_t num_subchannels); + + /** mutex guarding the rest of the state */ + gpr_mu mu; + /** are we currently resolving? */ + int resolving; + /** which version of resolved_config have we published? */ + int published_version; + /** which version of resolved_config is current? */ + int resolved_version; + /** pending next completion, or NULL */ + grpc_iomgr_closure *next_completion; + /** target config address for next completion */ + grpc_client_config **target_config; + /** current (fully resolved) config */ + grpc_client_config *resolved_config; + + /** zookeeper handle */ + zhandle_t *zookeeper_handle; + /** zookeeper resolved addresses */ + grpc_resolved_addresses *resolved_addrs; + /** total number of addresses to be resolved */ + int resolved_total; + /** number of addresses resolved */ + int resolved_num; +} zookeeper_resolver; + +static void zookeeper_destroy(grpc_resolver *r); + +static void zookeeper_start_resolving_locked(zookeeper_resolver *r); +static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r); + +static void zookeeper_shutdown(grpc_resolver *r); +static void zookeeper_channel_saw_error(grpc_resolver *r, + struct sockaddr *failing_address, + int failing_address_len); +static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config, + grpc_iomgr_closure *on_complete); + +static const grpc_resolver_vtable zookeeper_resolver_vtable = { + zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, + zookeeper_next}; + +static void zookeeper_shutdown(grpc_resolver *resolver) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->next_completion != NULL) { + *r->target_config = NULL; + grpc_iomgr_add_callback(r->next_completion); + r->next_completion = NULL; + } + zookeeper_close(r->zookeeper_handle); + gpr_mu_unlock(&r->mu); +} + +static void zookeeper_channel_saw_error(grpc_resolver *resolver, + struct sockaddr *sa, int len) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); +} + +static void zookeeper_next(grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_iomgr_closure *on_complete) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->next_completion == NULL); + r->next_completion = on_complete; + r->target_config = target_config; + if (r->resolved_version == 0 && r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } else { + zookeeper_maybe_finish_next_locked(r); + } + gpr_mu_unlock(&r->mu); +} + +/** Zookeeper global watcher for connection management + TODO: better connection management besides logs */ +static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, + int state, const char *path, + void *watcher_ctx) { + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_EXPIRED_SESSION_STATE) { + gpr_log(GPR_ERROR, "Zookeeper session expired"); + } else if (state == ZOO_AUTH_FAILED_STATE) { + gpr_log(GPR_ERROR, "Zookeeper authentication failed"); + } + } +} + +/** Zookeeper watcher triggered by changes to watched nodes + Once triggered, it tries to resolve again to get updated addresses */ +static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, + const char *path, void *watcher_ctx) { + if (watcher_ctx != NULL) { + zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx; + if (state == ZOO_CONNECTED_STATE) { + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); + } + } +} + +/** Callback function after getting all resolved addresses + Creates a subchannel for each address */ +static void zookeeper_on_resolved(void *arg, + grpc_resolved_addresses *addresses) { + zookeeper_resolver *r = arg; + grpc_client_config *config = NULL; + grpc_subchannel **subchannels; + grpc_subchannel_args args; + grpc_lb_policy *lb_policy; + size_t i; + if (addresses != NULL) { + config = grpc_client_config_create(); + subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); + for (i = 0; i < addresses->naddrs; i++) { + memset(&args, 0, sizeof(args)); + args.addr = (struct sockaddr *)(addresses->addrs[i].addr); + args.addr_len = addresses->addrs[i].len; + subchannels[i] = grpc_subchannel_factory_create_subchannel( + r->subchannel_factory, &args); + } + lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "construction"); + grpc_resolved_addresses_destroy(addresses); + gpr_free(subchannels); + } + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving == 1); + r->resolving = 0; + if (r->resolved_config != NULL) { + grpc_client_config_unref(r->resolved_config); + } + r->resolved_config = config; + r->resolved_version++; + zookeeper_maybe_finish_next_locked(r); + gpr_mu_unlock(&r->mu); + + GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving"); +} + +/** Callback function for each DNS resolved address */ +static void zookeeper_dns_resolved(void *arg, + grpc_resolved_addresses *addresses) { + size_t i; + zookeeper_resolver *r = arg; + int resolve_done = 0; + + gpr_mu_lock(&r->mu); + r->resolved_num++; + r->resolved_addrs->addrs = + gpr_realloc(r->resolved_addrs->addrs, + sizeof(grpc_resolved_address) * + (r->resolved_addrs->naddrs + addresses->naddrs)); + for (i = 0; i < addresses->naddrs; i++) { + memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, + addresses->addrs[i].addr, addresses->addrs[i].len); + r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = + addresses->addrs[i].len; + } + + r->resolved_addrs->naddrs += addresses->naddrs; + grpc_resolved_addresses_destroy(addresses); + + /** Wait for all addresses to be resolved */ + resolve_done = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_done) { + zookeeper_on_resolved(r, r->resolved_addrs); + } +} + +/** Parses JSON format address of a zookeeper node */ +static char *zookeeper_parse_address(const char *value, int value_len) { + grpc_json *json; + grpc_json *cur; + const char *host; + const char *port; + char *buffer; + char *address = NULL; + + buffer = gpr_malloc(value_len); + memcpy(buffer, value, value_len); + json = grpc_json_parse_string_with_len(buffer, value_len); + if (json != NULL) { + host = NULL; + port = NULL; + for (cur = json->child; cur != NULL; cur = cur->next) { + if (!strcmp(cur->key, "host")) { + host = cur->value; + if (port != NULL) { + break; + } + } else if (!strcmp(cur->key, "port")) { + port = cur->value; + if (host != NULL) { + break; + } + } + } + if (host != NULL && port != NULL) { + gpr_asprintf(&address, "%s:%s", host, port); + } + grpc_json_destroy(json); + } + gpr_free(buffer); + + return address; +} + +static void zookeeper_get_children_node_completion(int rc, const char *value, + int value_len, + const struct Stat *stat, + const void *arg) { + char *address = NULL; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + int resolve_done = 0; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name); + return; + } + + address = zookeeper_parse_address(value, value_len); + if (address != NULL) { + /** Further resolves address by DNS */ + grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + gpr_free(address); + } else { + gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name); + gpr_mu_lock(&r->mu); + r->resolved_total--; + resolve_done = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_done) { + zookeeper_on_resolved(r, r->resolved_addrs); + } + } +} + +static void zookeeper_get_children_completion( + int rc, const struct String_vector *children, const void *arg) { + char *path; + int status; + int i; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + return; + } + + if (children->count == 0) { + gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name); + return; + } + + r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->resolved_addrs->addrs = NULL; + r->resolved_addrs->naddrs = 0; + r->resolved_total = children->count; + + /** TODO: Replace expensive heap allocation with stack + if we can get maximum length of zookeeper path */ + for (i = 0; i < children->count; i++) { + gpr_asprintf(&path, "%s/%s", r->name, children->data[i]); + status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r, + zookeeper_get_children_node_completion, r); + gpr_free(path); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path); + } + } +} + +static void zookeeper_get_node_completion(int rc, const char *value, + int value_len, + const struct Stat *stat, + const void *arg) { + int status; + char *address = NULL; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + r->resolved_addrs = NULL; + r->resolved_total = 0; + r->resolved_num = 0; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + return; + } + + /** If zookeeper node of path r->name does not have address + (i.e. service node), get its children */ + address = zookeeper_parse_address(value, value_len); + if (address != NULL) { + r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->resolved_addrs->addrs = NULL; + r->resolved_addrs->naddrs = 0; + r->resolved_total = 1; + /** Further resolves address by DNS */ + grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + gpr_free(address); + return; + } + + status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher, + r, zookeeper_get_children_completion, r); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + } +} + +static void zookeeper_resolve_address(zookeeper_resolver *r) { + int status; + status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r, + zookeeper_get_node_completion, r); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + } +} + +static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { + GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving"); + GPR_ASSERT(r->resolving == 0); + r->resolving = 1; + zookeeper_resolve_address(r); +} + +static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { + if (r->next_completion != NULL && + r->resolved_version != r->published_version) { + *r->target_config = r->resolved_config; + if (r->resolved_config != NULL) { + grpc_client_config_ref(r->resolved_config); + } + grpc_iomgr_add_callback(r->next_completion); + r->next_completion = NULL; + r->published_version = r->resolved_version; + } +} + +static void zookeeper_destroy(grpc_resolver *gr) { + zookeeper_resolver *r = (zookeeper_resolver *)gr; + gpr_mu_destroy(&r->mu); + if (r->resolved_config != NULL) { + grpc_client_config_unref(r->resolved_config); + } + grpc_subchannel_factory_unref(r->subchannel_factory); + gpr_free(r->name); + gpr_free(r); +} + +static grpc_resolver *zookeeper_create( + grpc_uri *uri, + grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, + size_t num_subchannels), + grpc_subchannel_factory *subchannel_factory) { + zookeeper_resolver *r; + size_t length; + char *path = uri->path; + + if (0 == strcmp(uri->authority, "")) { + gpr_log(GPR_ERROR, "No authority specified in zookeeper uri"); + return NULL; + } + + /** Removes the trailing slash if exists */ + length = strlen(path); + if (length > 1 && path[length - 1] == '/') { + path[length - 1] = 0; + } + + r = gpr_malloc(sizeof(zookeeper_resolver)); + memset(r, 0, sizeof(*r)); + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); + r->name = gpr_strdup(path); + + r->subchannel_factory = subchannel_factory; + r->lb_policy_factory = lb_policy_factory; + grpc_subchannel_factory_ref(subchannel_factory); + + /** Initializes zookeeper client */ + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); + r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher, + GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); + if (r->zookeeper_handle == NULL) { + gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); + return NULL; + } + + return &r->base; +} + +static void zookeeper_plugin_init() { + grpc_register_resolver_type("zookeeper", + grpc_zookeeper_resolver_factory_create()); +} + +void grpc_zookeeper_register() { + grpc_register_plugin(zookeeper_plugin_init, NULL); +} + +/* + * FACTORY + */ + +static void zookeeper_factory_ref(grpc_resolver_factory *factory) {} + +static void zookeeper_factory_unref(grpc_resolver_factory *factory) {} + +static grpc_resolver *zookeeper_factory_create_resolver( + grpc_resolver_factory *factory, grpc_uri *uri, + grpc_subchannel_factory *subchannel_factory) { + return zookeeper_create(uri, grpc_create_pick_first_lb_policy, + subchannel_factory); +} + +static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { + zookeeper_factory_ref, zookeeper_factory_unref, + zookeeper_factory_create_resolver}; +static grpc_resolver_factory zookeeper_resolver_factory = { + &zookeeper_factory_vtable}; + +grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() { + return &zookeeper_resolver_factory; +} diff --git a/src/core/client_config/resolvers/zookeeper_resolver.h b/src/core/client_config/resolvers/zookeeper_resolver.h new file mode 100644 index 0000000000..a6f002dd6d --- /dev/null +++ b/src/core/client_config/resolvers/zookeeper_resolver.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H + +#include "src/core/client_config/resolver_factory.h" + +/** Create a zookeeper resolver factory */ +grpc_resolver_factory *grpc_zookeeper_resolver_factory_create(void); + +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */ diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index d1cd33b2af..2e36c69134 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -91,8 +91,10 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, grpc_connectivity_state *state, grpc_iomgr_closure *notify); +/** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); +/** stop following \a channel's activity through \a pollset. */ void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c index 7dc6d99ebe..585e465fa4 100644 --- a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c +++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c @@ -35,9 +35,9 @@ #include "src/core/client_config/subchannel_factory_decorators/merge_channel_args.h" grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg( - grpc_subchannel_factory *input, const grpc_arg *arg) { - grpc_channel_args args; - args.num_args = 1; - args.args = (grpc_arg *)arg; - return grpc_subchannel_factory_merge_channel_args(input, &args); + grpc_subchannel_factory *input, const grpc_arg *arg) { + grpc_channel_args args; + args.num_args = 1; + args.args = (grpc_arg *)arg; + return grpc_subchannel_factory_merge_channel_args(input, &args); } diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h index 1937623374..8457294000 100644 --- a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h +++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h @@ -40,6 +40,7 @@ channel_args by adding a new argument; ownership of input, arg is retained by the caller. */ grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg( - grpc_subchannel_factory *input, const grpc_arg *arg); + grpc_subchannel_factory *input, const grpc_arg *arg); -#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H */ +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H \ + */ diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c index 7e028857ac..c1b5507fde 100644 --- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c +++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c @@ -50,7 +50,7 @@ static void merge_args_factory_ref(grpc_subchannel_factory *scf) { static void merge_args_factory_unref(grpc_subchannel_factory *scf) { merge_args_factory *f = (merge_args_factory *)scf; if (gpr_unref(&f->refs)) { - grpc_subchannel_factory_unref(f->wrapped); + grpc_subchannel_factory_unref(f->wrapped); grpc_channel_args_destroy(f->merge_args); gpr_free(f); } @@ -73,7 +73,7 @@ static const grpc_subchannel_factory_vtable merge_args_factory_vtable = { merge_args_factory_create_subchannel}; grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args( - grpc_subchannel_factory *input, const grpc_channel_args *args) { + grpc_subchannel_factory *input, const grpc_channel_args *args) { merge_args_factory *f = gpr_malloc(sizeof(*f)); f->base.vtable = &merge_args_factory_vtable; gpr_ref_init(&f->refs, 1); diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h index 73a03b752f..f4757f0650 100644 --- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h +++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h @@ -40,6 +40,7 @@ channel_args by adding a new argument; ownership of input, args is retained by the caller. */ grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args( - grpc_subchannel_factory *input, const grpc_channel_args *args); + grpc_subchannel_factory *input, const grpc_channel_args *args); -#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H */ +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H \ + */ diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index 0fd028741e..6ed6dbe93f 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -35,13 +35,20 @@ #include <string.h> #include <grpc/compression.h> -int grpc_compression_algorithm_parse(const char* name, +int grpc_compression_algorithm_parse(const char *name, size_t name_length, grpc_compression_algorithm *algorithm) { - if (strcmp(name, "identity") == 0) { + /* we use strncmp not only because it's safer (even though in this case it + * doesn't matter, given that we are comparing against string literals, but + * because this way we needn't have "name" nil-terminated (useful for slice + * data, for example) */ + if (name_length == 0) { + return 0; + } + if (strncmp(name, "identity", name_length) == 0) { *algorithm = GRPC_COMPRESS_NONE; - } else if (strcmp(name, "gzip") == 0) { + } else if (strncmp(name, "gzip", name_length) == 0) { *algorithm = GRPC_COMPRESS_GZIP; - } else if (strcmp(name, "deflate") == 0) { + } else if (strncmp(name, "deflate", name_length) == 0) { *algorithm = GRPC_COMPRESS_DEFLATE; } else { return 0; diff --git a/src/core/debug/trace.c b/src/core/debug/trace.c index b53dfe804b..1014b1f4db 100644 --- a/src/core/debug/trace.c +++ b/src/core/debug/trace.c @@ -61,8 +61,8 @@ static void add(const char *beg, const char *end, char ***ss, size_t *ns) { size_t np = n + 1; char *s = gpr_malloc(end - beg + 1); memcpy(s, beg, end - beg); - s[end-beg] = 0; - *ss = gpr_realloc(*ss, sizeof(char**) * np); + s[end - beg] = 0; + *ss = gpr_realloc(*ss, sizeof(char **) * np); (*ss)[n] = s; *ns = np; } @@ -73,7 +73,7 @@ static void split(const char *s, char ***ss, size_t *ns) { add(s, s + strlen(s), ss, ns); } else { add(s, c, ss, ns); - split(c+1, ss, ns); + split(c + 1, ss, ns); } } @@ -125,7 +125,7 @@ int grpc_tracer_set_enabled(const char *name, int enabled) { } if (!found) { gpr_log(GPR_ERROR, "Unknown trace var: '%s'", name); - return 0; /* early return */ + return 0; /* early return */ } } return 1; diff --git a/src/core/debug/trace.h b/src/core/debug/trace.h index fc8615bc69..dc5875976e 100644 --- a/src/core/debug/trace.h +++ b/src/core/debug/trace.h @@ -40,4 +40,4 @@ void grpc_register_tracer(const char *name, int *flag); void grpc_tracer_init(const char *env_var_name); void grpc_tracer_shutdown(void); -#endif /* GRPC_INTERNAL_CORE_DEBUG_TRACE_H */ +#endif /* GRPC_INTERNAL_CORE_DEBUG_TRACE_H */ diff --git a/src/core/httpcli/format_request.c b/src/core/httpcli/format_request.c index e875423e87..6189fce86b 100644 --- a/src/core/httpcli/format_request.c +++ b/src/core/httpcli/format_request.c @@ -43,7 +43,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> -static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *buf) { +static void fill_common_header(const grpc_httpcli_request *request, + gpr_strvec *buf) { size_t i; gpr_strvec_add(buf, gpr_strdup(request->path)); gpr_strvec_add(buf, gpr_strdup(" HTTP/1.0\r\n")); @@ -52,7 +53,8 @@ static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec * gpr_strvec_add(buf, gpr_strdup(request->host)); gpr_strvec_add(buf, gpr_strdup("\r\n")); gpr_strvec_add(buf, gpr_strdup("Connection: close\r\n")); - gpr_strvec_add(buf, gpr_strdup("User-Agent: "GRPC_HTTPCLI_USER_AGENT"\r\n")); + gpr_strvec_add(buf, + gpr_strdup("User-Agent: " GRPC_HTTPCLI_USER_AGENT "\r\n")); /* user supplied headers */ for (i = 0; i < request->hdr_count; i++) { gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].key)); diff --git a/src/core/httpcli/format_request.h b/src/core/httpcli/format_request.h index 8bfb20bfd0..c8dc8f7d4e 100644 --- a/src/core/httpcli/format_request.h +++ b/src/core/httpcli/format_request.h @@ -42,4 +42,4 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request, const char *body_bytes, size_t body_size); -#endif /* GRPC_INTERNAL_CORE_HTTPCLI_FORMAT_REQUEST_H */ +#endif /* GRPC_INTERNAL_CORE_HTTPCLI_FORMAT_REQUEST_H */ diff --git a/src/core/httpcli/parser.h b/src/core/httpcli/parser.h index 71280e7479..3fbb4c7479 100644 --- a/src/core/httpcli/parser.h +++ b/src/core/httpcli/parser.h @@ -61,4 +61,4 @@ void grpc_httpcli_parser_destroy(grpc_httpcli_parser *parser); int grpc_httpcli_parser_parse(grpc_httpcli_parser *parser, gpr_slice slice); int grpc_httpcli_parser_eof(grpc_httpcli_parser *parser); -#endif /* GRPC_INTERNAL_CORE_HTTPCLI_PARSER_H */ +#endif /* GRPC_INTERNAL_CORE_HTTPCLI_PARSER_H */ diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 68d33b9cf6..ddb30dc4bb 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -105,8 +105,7 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown(void) { int i; - while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, - 0)) + while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, 0)) ; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -362,7 +361,7 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_alarms( - drop_mu, now, next, + drop_mu, now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); } diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h index c067a0b8a3..4a13527e64 100644 --- a/src/core/iomgr/alarm.h +++ b/src/core/iomgr/alarm.h @@ -86,4 +86,4 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, Requires: cancel() must happen after add() on a given alarm */ void grpc_alarm_cancel(grpc_alarm *alarm); -#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */ diff --git a/src/core/iomgr/alarm_heap.c b/src/core/iomgr/alarm_heap.c index d912178fda..daed251982 100644 --- a/src/core/iomgr/alarm_heap.c +++ b/src/core/iomgr/alarm_heap.c @@ -66,11 +66,11 @@ static void adjust_downwards(grpc_alarm **first, int i, int length, int next_i; if (left_child >= length) break; right_child = left_child + 1; - next_i = - right_child < length && gpr_time_cmp(first[left_child]->deadline, - first[right_child]->deadline) < 0 - ? right_child - : left_child; + next_i = right_child < length && + gpr_time_cmp(first[left_child]->deadline, + first[right_child]->deadline) < 0 + ? right_child + : left_child; if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break; first[i] = first[next_i]; first[i]->heap_index = i; diff --git a/src/core/iomgr/alarm_heap.h b/src/core/iomgr/alarm_heap.h index c5adfc6d31..60db6c991b 100644 --- a/src/core/iomgr/alarm_heap.h +++ b/src/core/iomgr/alarm_heap.h @@ -54,4 +54,4 @@ void grpc_alarm_heap_pop(grpc_alarm_heap *heap); int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap); -#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H */ diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h index 0268a01bad..e9f98a3444 100644 --- a/src/core/iomgr/alarm_internal.h +++ b/src/core/iomgr/alarm_internal.h @@ -59,4 +59,4 @@ gpr_timespec grpc_alarm_list_next_timeout(void); void grpc_kick_poller(void); -#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H */ diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 744fe7656c..8ee14bce9b 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -50,7 +50,8 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { ep->vtable->add_to_pollset(ep, pollset); } -void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) { +void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { ep->vtable->add_to_pollset_set(ep, pollset_set); } diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index a2216925f9..ea92a500e8 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -103,10 +103,11 @@ void grpc_endpoint_destroy(grpc_endpoint *ep); /* Add an endpoint to a pollset, so that when the pollset is polled, events from this endpoint are considered */ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset); -void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set); +void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, + grpc_pollset_set *pollset_set); struct grpc_endpoint { const grpc_endpoint_vtable *vtable; }; -#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H */ diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h index 25087be0c7..095ec5fcc9 100644 --- a/src/core/iomgr/endpoint_pair.h +++ b/src/core/iomgr/endpoint_pair.h @@ -44,4 +44,4 @@ typedef struct { grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read_slice_size); -#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c index e8295df8b3..db9d092dca 100644 --- a/src/core/iomgr/endpoint_pair_windows.c +++ b/src/core/iomgr/endpoint_pair_windows.c @@ -52,21 +52,26 @@ static void create_sockets(SOCKET sv[2]) { SOCKADDR_IN addr; int addr_len = sizeof(addr); - lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + WSA_FLAG_OVERLAPPED); GPR_ASSERT(lst_sock != INVALID_SOCKET); memset(&addr, 0, sizeof(addr)); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); addr.sin_family = AF_INET; - GPR_ASSERT(bind(lst_sock, (struct sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR); + GPR_ASSERT(bind(lst_sock, (struct sockaddr *)&addr, sizeof(addr)) != + SOCKET_ERROR); GPR_ASSERT(listen(lst_sock, SOMAXCONN) != SOCKET_ERROR); - GPR_ASSERT(getsockname(lst_sock, (struct sockaddr*)&addr, &addr_len) != SOCKET_ERROR); + GPR_ASSERT(getsockname(lst_sock, (struct sockaddr *)&addr, &addr_len) != + SOCKET_ERROR); - cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + WSA_FLAG_OVERLAPPED); GPR_ASSERT(cli_sock != INVALID_SOCKET); - GPR_ASSERT(WSAConnect(cli_sock, (struct sockaddr*)&addr, addr_len, NULL, NULL, NULL, NULL) == 0); - svr_sock = accept(lst_sock, (struct sockaddr*)&addr, &addr_len); + GPR_ASSERT(WSAConnect(cli_sock, (struct sockaddr *)&addr, addr_len, NULL, + NULL, NULL, NULL) == 0); + svr_sock = accept(lst_sock, (struct sockaddr *)&addr, &addr_len); GPR_ASSERT(svr_sock != INVALID_SOCKET); closesocket(lst_sock); @@ -77,7 +82,8 @@ static void create_sockets(SOCKET sv[2]) { sv[0] = svr_sock; } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + size_t read_slice_size) { SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 8741241fb8..09a457dd9a 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -65,18 +65,17 @@ static void do_iocp_work() { LPOVERLAPPED overlapped; grpc_winsocket *socket; grpc_winsocket_callback_info *info; - void(*f)(void *, int) = NULL; + void (*f)(void *, int) = NULL; void *opaque = NULL; - success = GetQueuedCompletionStatus(g_iocp, &bytes, - &completion_key, &overlapped, - INFINITE); + success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, + &overlapped, INFINITE); /* success = 0 and overlapped = NULL means the deadline got attained. Which is impossible. since our wait time is +inf */ GPR_ASSERT(success || overlapped); GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { gpr_atm_full_fetch_add(&g_custom_events, -1); - if (completion_key == (ULONG_PTR) &g_iocp_kick_token) { + if (completion_key == (ULONG_PTR)&g_iocp_kick_token) { /* We were awoken from a kick. */ return; } @@ -84,7 +83,7 @@ static void do_iocp_work() { abort(); } - socket = (grpc_winsocket*) completion_key; + socket = (grpc_winsocket *)completion_key; if (overlapped == &socket->write_info.overlapped) { info = &socket->write_info; } else if (overlapped == &socket->read_info.overlapped) { @@ -121,8 +120,7 @@ static void do_iocp_work() { } static void iocp_loop(void *p) { - while (gpr_atm_acq_load(&g_orphans) || - gpr_atm_acq_load(&g_custom_events) || + while (gpr_atm_acq_load(&g_orphans) || gpr_atm_acq_load(&g_custom_events) || !gpr_event_get(&g_shutdown_iocp)) { grpc_maybe_call_delayed_callbacks(NULL, 1); do_iocp_work(); @@ -134,8 +132,8 @@ static void iocp_loop(void *p) { void grpc_iocp_init(void) { gpr_thd_id id; - g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, - NULL, (ULONG_PTR)NULL, 0); + g_iocp = + CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0); GPR_ASSERT(g_iocp); gpr_event_init(&g_iocp_done); @@ -147,8 +145,7 @@ void grpc_iocp_kick(void) { BOOL success; gpr_atm_full_fetch_add(&g_custom_events, 1); - success = PostQueuedCompletionStatus(g_iocp, 0, - (ULONG_PTR) &g_iocp_kick_token, + success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token, &g_iocp_custom_overlap); GPR_ASSERT(success); } @@ -165,8 +162,8 @@ void grpc_iocp_shutdown(void) { void grpc_iocp_add_socket(grpc_winsocket *socket) { HANDLE ret; if (socket->added_to_iocp) return; - ret = CreateIoCompletionPort((HANDLE)socket->socket, - g_iocp, (gpr_uintptr) socket, 0); + ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp, + (gpr_uintptr)socket, 0); if (!ret) { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message); @@ -189,7 +186,7 @@ void grpc_iocp_socket_orphan(grpc_winsocket *socket) { the callback now. -) The IOCP hasn't completed yet, and we're queuing it for later. */ static void socket_notify_on_iocp(grpc_winsocket *socket, - void(*cb)(void *, int), void *opaque, + void (*cb)(void *, int), void *opaque, grpc_winsocket_callback_info *info) { int run_now = 0; GPR_ASSERT(!info->cb); @@ -206,13 +203,13 @@ static void socket_notify_on_iocp(grpc_winsocket *socket, } void grpc_socket_notify_on_write(grpc_winsocket *socket, - void(*cb)(void *, int), void *opaque) { + void (*cb)(void *, int), void *opaque) { socket_notify_on_iocp(socket, cb, opaque, &socket->write_info); } -void grpc_socket_notify_on_read(grpc_winsocket *socket, - void(*cb)(void *, int), void *opaque) { +void grpc_socket_notify_on_read(grpc_winsocket *socket, void (*cb)(void *, int), + void *opaque) { socket_notify_on_iocp(socket, cb, opaque, &socket->read_info); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index 9df6476917..ee3847a229 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -44,10 +44,10 @@ void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket *); void grpc_iocp_socket_orphan(grpc_winsocket *); -void grpc_socket_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success), - void *opaque); +void grpc_socket_notify_on_write(grpc_winsocket *, + void (*cb)(void *, int success), void *opaque); -void grpc_socket_notify_on_read(grpc_winsocket *, void(*cb)(void *, int success), - void *opaque); +void grpc_socket_notify_on_read(grpc_winsocket *, + void (*cb)(void *, int success), void *opaque); -#endif /* GRPC_INTERNAL_CORE_IOMGR_IOCP_WINDOWS_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_IOCP_WINDOWS_H */ diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 6d4a82917b..261c17366a 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -77,4 +77,4 @@ void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); argument. */ void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); -#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index 6c1e0e1799..4cec973ba0 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -52,4 +52,4 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); void grpc_iomgr_platform_init(void); void grpc_iomgr_platform_shutdown(void); -#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index 758ae77b86..2425e59941 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -51,4 +51,4 @@ void grpc_iomgr_platform_shutdown(void) { grpc_fd_global_shutdown(); } -#endif /* GRPC_POSIX_SOCKET */ +#endif /* GRPC_POSIX_SOCKET */ diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_posix.h index a404f6433e..716fedb636 100644 --- a/src/core/iomgr/iomgr_posix.h +++ b/src/core/iomgr/iomgr_posix.h @@ -39,4 +39,4 @@ void grpc_pollset_global_init(void); void grpc_pollset_global_shutdown(void); -#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */ diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c index 74cd5a829b..b49cb87e97 100644 --- a/src/core/iomgr/iomgr_windows.c +++ b/src/core/iomgr/iomgr_windows.c @@ -68,4 +68,4 @@ void grpc_iomgr_platform_shutdown(void) { winsock_shutdown(); } -#endif /* GRPC_WINSOCK_SOCKET */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index c474e4dbf1..337596cb74 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -74,10 +74,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset); grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will not be released by grpc_pollset_work AFTER worker has been destroyed. - Returns true if some work has been done, and false if the deadline - expired. */ -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline); + Tries not to block past deadline. */ +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 1320c64579..fe66ebed77 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work( pfds[1].events = POLLIN; pfds[1].revents = 0; - poll_rv = poll(pfds, 2, timeout_ms); + poll_rv = grpc_poll_function(pfds, 2, timeout_ms); if (poll_rv < 0) { if (errno != EINTR) { @@ -234,8 +234,7 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable multipoll_with_epoll_pollset = { - multipoll_with_epoll_pollset_add_fd, - multipoll_with_epoll_pollset_del_fd, + multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd, multipoll_with_epoll_pollset_maybe_work, multipoll_with_epoll_pollset_finish_shutdown, multipoll_with_epoll_pollset_destroy}; diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index b5b2d7534d..30ee6e24db 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -74,7 +74,7 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, } h->fds[h->fd_count++] = fd; GRPC_FD_REF(fd, "multipoller"); -exit: +exit: if (and_unlock_pollset) { gpr_mu_unlock(&pollset->mu); } @@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work( POLLOUT, &watchers[i]); } - r = poll(pfds, pfd_count, timeout); + r = grpc_poll_function(pfds, pfd_count, timeout); for (i = 1; i < pfd_count; i++) { grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN, @@ -202,8 +202,7 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable multipoll_with_poll_pollset = { - multipoll_with_poll_pollset_add_fd, - multipoll_with_poll_pollset_del_fd, + multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, multipoll_with_poll_pollset_maybe_work, multipoll_with_poll_pollset_finish_shutdown, multipoll_with_poll_pollset_destroy}; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index d3a9193af1..6bd1b61f24 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -38,7 +38,6 @@ #include "src/core/iomgr/pollset_posix.h" #include <errno.h> -#include <poll.h> #include <stdlib.h> #include <string.h> #include <unistd.h> @@ -57,6 +56,8 @@ GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); +grpc_poll_function_type grpc_poll_function = poll; + static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next; worker->next->prev = worker->prev; @@ -89,6 +90,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { for (specific_worker = p->root_worker.next; @@ -140,10 +142,10 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(pollset, fd, 1); - /* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to del_fd above is - not respected, the code will deadlock (in a way that we have a chance of - debugging) */ +/* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ #ifndef NDEBUG gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); @@ -153,10 +155,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->del_fd(pollset, fd, 1); - /* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to del_fd above is - not respected, the code will deadlock (in a way that we have a chance of - debugging) */ +/* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ #ifndef NDEBUG gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); @@ -168,14 +170,10 @@ static void finish_shutdown(grpc_pollset *pollset) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline) { +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline) { /* pollset->mu already held */ - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); int added_worker = 0; - if (gpr_time_cmp(now, deadline) > 0) { - return 0; - } /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ @@ -217,7 +215,6 @@ done: gpr_mu_lock(&pollset->mu); } } - return 1; } void grpc_pollset_shutdown(grpc_pollset *pollset, @@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ - r = poll(pfd, nfds, timeout); + r = grpc_poll_function(pfd, nfds, timeout); GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 1c1b736193..69bd9cca8c 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H +#include <poll.h> + #include <grpc/support/sync.h> #include "src/core/iomgr/wakeup_fd_posix.h" @@ -102,7 +104,8 @@ void grpc_kick_drain(grpc_pollset *p); - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now); /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset, @@ -117,4 +120,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds, * be locked) */ int grpc_pollset_has_workers(grpc_pollset *pollset); +/* override to allow tests to hook poll() usage */ +typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); +extern grpc_poll_function_type grpc_poll_function; + #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 22dc5891c3..07522c8a0c 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -56,8 +56,7 @@ static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { grpc_pollset_worker *w = p->root_worker.next; remove_worker(p, w); return w; - } - else { + } else { return NULL; } } @@ -100,13 +99,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) { - gpr_timespec now; +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline) { int added_worker = 0; - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(now, deadline) > 0) { - return 0 /* GPR_FALSE */; - } worker->next = worker->prev = NULL; gpr_cv_init(&worker->cv); if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) { @@ -127,15 +122,14 @@ done: if (added_worker) { remove_worker(pollset, worker); } - return 1 /* GPR_TRUE */; } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { for (specific_worker = p->root_worker.next; - specific_worker != &p->root_worker; - specific_worker = specific_worker->next) { + specific_worker != &p->root_worker; + specific_worker = specific_worker->next) { gpr_cv_signal(&specific_worker->cv); } p->kicked_without_pollers = 1; diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h index 8f1d7a22bb..cc1bd428b0 100644 --- a/src/core/iomgr/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -66,4 +66,4 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); grpc_resolved_addresses *grpc_blocking_resolve_address( const char *addr, const char *default_port); -#endif /* GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index dbf884c769..ce6972b797 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -105,10 +105,7 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( s = getaddrinfo(host, port, &hints, &result); if (s != 0) { /* Retry if well-known service name is recognized */ - char *svc[][2] = { - {"http", "80"}, - {"https", "443"} - }; + char *svc[][2] = {{"http", "80"}, {"https", "443"}}; int i; for (i = 0; i < (int)(sizeof(svc) / sizeof(svc[0])); i++) { if (strcmp(port, svc[i][0]) == 0) { diff --git a/src/core/iomgr/sockaddr.h b/src/core/iomgr/sockaddr.h index 7528db73b8..e41e1ec6b4 100644 --- a/src/core/iomgr/sockaddr.h +++ b/src/core/iomgr/sockaddr.h @@ -44,4 +44,4 @@ #include "src/core/iomgr/sockaddr_posix.h" #endif -#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_H */ diff --git a/src/core/iomgr/sockaddr_posix.h b/src/core/iomgr/sockaddr_posix.h index 2a3d932f70..388abb3306 100644 --- a/src/core/iomgr/sockaddr_posix.h +++ b/src/core/iomgr/sockaddr_posix.h @@ -41,4 +41,4 @@ #include <netdb.h> #include <unistd.h> -#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_POSIX_H */ diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index 65ec1f94ac..efdc480365 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -206,7 +206,8 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) { case AF_UNIX: return 1; default: - gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", addr->sa_family); + gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", + addr->sa_family); return 0; } } @@ -220,7 +221,8 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) { ((struct sockaddr_in6 *)addr)->sin6_port = htons(port); return 1; default: - gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", addr->sa_family); + gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", + addr->sa_family); return 0; } } diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h index 99f1ed54da..6f7a279900 100644 --- a/src/core/iomgr/sockaddr_utils.h +++ b/src/core/iomgr/sockaddr_utils.h @@ -86,4 +86,4 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, char *grpc_sockaddr_to_uri(const struct sockaddr *addr); -#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h index be55db805a..fe2be99145 100644 --- a/src/core/iomgr/sockaddr_win32.h +++ b/src/core/iomgr/sockaddr_win32.h @@ -43,4 +43,4 @@ const char *inet_ntop(int af, const void *src, char *dst, socklen_t size); #endif -#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_WIN32_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_WIN32_H */ diff --git a/src/core/iomgr/socket_utils_posix.h b/src/core/iomgr/socket_utils_posix.h index d2a315b462..d330d1986e 100644 --- a/src/core/iomgr/socket_utils_posix.h +++ b/src/core/iomgr/socket_utils_posix.h @@ -110,4 +110,4 @@ extern int grpc_forbid_dualstack_sockets_for_testing; int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, int protocol, grpc_dualstack_mode *dsmode); -#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_UTILS_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index f6ddfff0ad..7d8421376b 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -106,4 +106,4 @@ void grpc_winsocket_destroy(grpc_winsocket *winsocket) { gpr_free(winsocket); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index 346fde8edd..ecf2530173 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -54,7 +54,7 @@ typedef struct grpc_winsocket_callback_info { OVERLAPPED overlapped; /* The callback information for the pending operation. May be empty if the caller hasn't registered a callback yet. */ - void(*cb)(void *opaque, int success); + void (*cb)(void *opaque, int success); void *opaque; /* A boolean to describe if the IO Completion Port got a notification for that operation. This will happen if the operation completed before the @@ -118,4 +118,4 @@ void grpc_winsocket_orphan(grpc_winsocket *socket); or by grpc_winsocket_orphan if there's no pending operation. */ void grpc_winsocket_destroy(grpc_winsocket *socket); -#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index 0fa08b52b0..8ad9b818e1 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -41,7 +41,7 @@ /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and - NULL on failure). + NULL on failure). interested_parties points to a set of pollsets that would be interested in this connection being established (in order to continue their work) */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 9572ce5980..66027f87a0 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -264,7 +264,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->write_closure.cb_arg = ac; gpr_mu_lock(&ac->mu); - grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + grpc_alarm_init(&ac->alarm, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 24fee0596f..360e6ebd8c 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -572,7 +572,8 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_pollset_add_fd(pollset, tcp->em_fd); } -static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) { +static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); } diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index d752feaeea..40b3ae2679 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -56,4 +56,4 @@ extern int grpc_tcp_trace; grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, const char *peer_string); -#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */ diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 0adbe9507c..d0478d3604 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -79,7 +79,8 @@ struct grpc_tcp_server { /* active port count: how many ports are actually still listening */ int active_ports; - /* number of iomgr callbacks that have been explicitly scheduled during shutdown */ + /* number of iomgr callbacks that have been explicitly scheduled during + * shutdown */ int iomgr_callbacks_pending; /* all listening ports */ @@ -292,7 +293,7 @@ static void on_accept(void *arg, int from_iocp) { and act accordingly. */ transfered_bytes = 0; wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, - &transfered_bytes, FALSE, &flags); + &transfered_bytes, FALSE, &flags); if (!wsa_success) { if (sp->shutting_down) { /* During the shutdown case, we ARE expecting an error. So that's well, @@ -309,16 +310,15 @@ static void on_accept(void *arg, int from_iocp) { if (!sp->shutting_down) { peer_name_string = NULL; err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, - (char *)&sp->socket->socket, - sizeof(sp->socket->socket)); + (char *)&sp->socket->socket, sizeof(sp->socket->socket)); if (err) { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message); gpr_free(utf8_message); } - err = getpeername(sock, (struct sockaddr*)&peer_name, &peer_name_len); + err = getpeername(sock, (struct sockaddr *)&peer_name, &peer_name_len); if (!err) { - peer_name_string = grpc_sockaddr_to_uri((struct sockaddr*)&peer_name); + peer_name_string = grpc_sockaddr_to_uri((struct sockaddr *)&peer_name); } else { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message); diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 89aa741470..123f46d71d 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -55,24 +55,22 @@ static int set_non_block(SOCKET sock) { int status; unsigned long param = 1; DWORD ret; - status = WSAIoctl(sock, FIONBIO, ¶m, sizeof(param), NULL, 0, &ret, - NULL, NULL); + status = + WSAIoctl(sock, FIONBIO, ¶m, sizeof(param), NULL, 0, &ret, NULL, NULL); return status == 0; } static int set_dualstack(SOCKET sock) { int status; unsigned long param = 0; - status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, - (const char *) ¶m, sizeof(param)); + status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char *)¶m, + sizeof(param)); return status == 0; } int grpc_tcp_prepare_socket(SOCKET sock) { - if (!set_non_block(sock)) - return 0; - if (!set_dualstack(sock)) - return 0; + if (!set_non_block(sock)) return 0; + if (!set_dualstack(sock)) return 0; return 1; } @@ -100,9 +98,7 @@ typedef struct grpc_tcp { char *peer_string; } grpc_tcp; -static void tcp_ref(grpc_tcp *tcp) { - gpr_ref(&tcp->refcount); -} +static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } static void tcp_unref(grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { @@ -116,7 +112,7 @@ static void tcp_unref(grpc_tcp *tcp) { /* Asynchronous callback from the IOCP, or the background thread. */ static void on_read(void *tcpp, int from_iocp) { - grpc_tcp *tcp = (grpc_tcp *) tcpp; + grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *socket = tcp->socket; gpr_slice sub; gpr_slice *slice = NULL; @@ -175,9 +171,9 @@ static void on_read(void *tcpp, int from_iocp) { cb(opaque, slice, nslices, status); } -static void win_notify_on_read(grpc_endpoint *ep, - grpc_endpoint_read_cb cb, void *arg) { - grpc_tcp *tcp = (grpc_tcp *) ep; +static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, + void *arg) { + grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->read_info; int status; @@ -201,8 +197,8 @@ static void win_notify_on_read(grpc_endpoint *ep, buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice); /* First let's try a synchronous, non-blocking read. */ - status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, - NULL, NULL); + status = + WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); /* Did we get data immediately ? Yay. */ @@ -232,7 +228,7 @@ static void win_notify_on_read(grpc_endpoint *ep, /* Asynchronous callback from the IOCP, or the background thread. */ static void on_write(void *tcpp, int from_iocp) { - grpc_tcp *tcp = (grpc_tcp *) tcpp; + grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK; @@ -286,7 +282,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_endpoint_write_cb cb, void *arg) { - grpc_tcp *tcp = (grpc_tcp *) ep; + grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *socket = tcp->socket; grpc_winsocket_callback_info *info = &socket->write_info; unsigned i; @@ -309,7 +305,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices); if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) { - buffers = (WSABUF *) gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count); + buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count); allocated = buffers; } @@ -370,15 +366,15 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) { grpc_tcp *tcp; - (void) ps; - tcp = (grpc_tcp *) ep; + (void)ps; + tcp = (grpc_tcp *)ep; grpc_iocp_add_socket(tcp->socket); } static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) { grpc_tcp *tcp; - (void) pss; - tcp = (grpc_tcp *) ep; + (void)pss; + tcp = (grpc_tcp *)ep; grpc_iocp_add_socket(tcp->socket); } @@ -389,7 +385,7 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) { callback will happen from another thread, so we need to protect against concurrent access of the data structure in that regard. */ static void win_shutdown(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *) ep; + grpc_tcp *tcp = (grpc_tcp *)ep; int extra_refs = 0; gpr_mu_lock(&tcp->mu); /* At that point, what may happen is that we're already inside the IOCP @@ -401,7 +397,7 @@ static void win_shutdown(grpc_endpoint *ep) { } static void win_destroy(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *) ep; + grpc_tcp *tcp = (grpc_tcp *)ep; tcp_unref(tcp); } @@ -410,13 +406,12 @@ static char *win_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static grpc_endpoint_vtable vtable = {win_notify_on_read, win_write, - win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, - win_get_peer}; +static grpc_endpoint_vtable vtable = { + win_notify_on_read, win_write, win_add_to_pollset, win_add_to_pollset_set, + win_shutdown, win_destroy, win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { - grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp)); + grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->socket = socket; @@ -427,4 +422,4 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { return &tcp->base; } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_windows.h b/src/core/iomgr/tcp_windows.h index 7e301db250..deb3e48293 100644 --- a/src/core/iomgr/tcp_windows.h +++ b/src/core/iomgr/tcp_windows.h @@ -54,4 +54,4 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string); int grpc_tcp_prepare_socket(SOCKET sock); -#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_WINDOWS_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_WINDOWS_H */ diff --git a/src/core/iomgr/time_averaged_stats.h b/src/core/iomgr/time_averaged_stats.h index 13894b2640..e6dec1b4cd 100644 --- a/src/core/iomgr/time_averaged_stats.h +++ b/src/core/iomgr/time_averaged_stats.h @@ -85,4 +85,4 @@ void grpc_time_averaged_stats_add_sample(grpc_time_averaged_stats *stats, value. */ double grpc_time_averaged_stats_update_average(grpc_time_averaged_stats *stats); -#endif /* GRPC_INTERNAL_CORE_IOMGR_TIME_AVERAGED_STATS_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TIME_AVERAGED_STATS_H */ diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c new file mode 100644 index 0000000000..6429c38b28 --- /dev/null +++ b/src/core/iomgr/udp_server.c @@ -0,0 +1,440 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SOCKET + +#include "src/core/iomgr/udp_server.h" + +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/un.h> +#include <unistd.h> + +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/support/string.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + +#define INIT_PORT_CAP 2 + +/* one listening port */ +typedef struct { + int fd; + grpc_fd *emfd; + grpc_udp_server *server; + union { + gpr_uint8 untyped[GRPC_MAX_SOCKADDR_SIZE]; + struct sockaddr sockaddr; + struct sockaddr_un un; + } addr; + int addr_len; + grpc_iomgr_closure read_closure; + grpc_iomgr_closure destroyed_closure; + grpc_udp_server_read_cb read_cb; +} server_port; + +static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { + struct stat st; + + if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { + unlink(un->sun_path); + } +} + +/* the overall server */ +struct grpc_udp_server { + grpc_udp_server_cb cb; + void *cb_arg; + + gpr_mu mu; + gpr_cv cv; + + /* active port count: how many ports are actually still listening */ + size_t active_ports; + /* destroyed port count: how many ports are completely destroyed */ + size_t destroyed_ports; + + /* is this server shutting down? (boolean) */ + int shutdown; + + /* all listening ports */ + server_port *ports; + size_t nports; + size_t port_capacity; + + /* shutdown callback */ + void (*shutdown_complete)(void *); + void *shutdown_complete_arg; + + /* all pollsets interested in new connections */ + grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ + size_t pollset_count; +}; + +grpc_udp_server *grpc_udp_server_create(void) { + grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); + gpr_mu_init(&s->mu); + gpr_cv_init(&s->cv); + s->active_ports = 0; + s->destroyed_ports = 0; + s->shutdown = 0; + s->cb = NULL; + s->cb_arg = NULL; + s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->nports = 0; + s->port_capacity = INIT_PORT_CAP; + + return s; +} + +static void finish_shutdown(grpc_udp_server *s) { + s->shutdown_complete(s->shutdown_complete_arg); + + gpr_mu_destroy(&s->mu); + gpr_cv_destroy(&s->cv); + + gpr_free(s->ports); + gpr_free(s); +} + +static void destroyed_port(void *server, int success) { + grpc_udp_server *s = server; + gpr_mu_lock(&s->mu); + s->destroyed_ports++; + if (s->destroyed_ports == s->nports) { + gpr_mu_unlock(&s->mu); + finish_shutdown(s); + } else { + gpr_mu_unlock(&s->mu); + } +} + +static void dont_care_about_shutdown_completion(void *ignored) {} + +/* called when all listening endpoints have been shutdown, so no further + events will be received on them - at this point it's safe to destroy + things */ +static void deactivated_all_ports(grpc_udp_server *s) { + size_t i; + + /* delete ALL the things */ + gpr_mu_lock(&s->mu); + + if (!s->shutdown) { + gpr_mu_unlock(&s->mu); + return; + } + + if (s->nports) { + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + if (sp->addr.sockaddr.sa_family == AF_UNIX) { + unlink_if_unix_domain_socket(&sp->addr.un); + } + sp->destroyed_closure.cb = destroyed_port; + sp->destroyed_closure.cb_arg = s; + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown"); + } + gpr_mu_unlock(&s->mu); + } else { + gpr_mu_unlock(&s->mu); + finish_shutdown(s); + } +} + +void grpc_udp_server_destroy( + grpc_udp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg), + void *shutdown_complete_arg) { + size_t i; + gpr_mu_lock(&s->mu); + + GPR_ASSERT(!s->shutdown); + s->shutdown = 1; + + s->shutdown_complete = shutdown_complete + ? shutdown_complete + : dont_care_about_shutdown_completion; + s->shutdown_complete_arg = shutdown_complete_arg; + + /* shutdown all fd's */ + if (s->active_ports) { + for (i = 0; i < s->nports; i++) { + grpc_fd_shutdown(s->ports[i].emfd); + } + gpr_mu_unlock(&s->mu); + } else { + gpr_mu_unlock(&s->mu); + deactivated_all_ports(s); + } +} + +/* Prepare a recently-created socket for listening. */ +static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + int get_local_ip; + int rc; + + if (fd < 0) { + goto error; + } + + get_local_ip = 1; + rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, + sizeof(get_local_ip)); + if (rc == 0 && addr->sa_family == AF_INET6) { +#if !TARGET_OS_IPHONE + rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, + sizeof(get_local_ip)); +#endif + } + + if (bind(fd, addr, addr_len) < 0) { + char *addr_str; + grpc_sockaddr_to_string(&addr_str, addr, 0); + gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); + gpr_free(addr_str); + goto error; + } + + sockname_len = sizeof(sockname_temp); + if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) { + goto error; + } + + return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + +error: + if (fd >= 0) { + close(fd); + } + return -1; +} + +/* event manager callback when reads are ready */ +static void on_read(void *arg, int success) { + server_port *sp = arg; + + if (success == 0) { + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_mu_unlock(&sp->server->mu); + deactivated_all_ports(sp->server); + } else { + gpr_mu_unlock(&sp->server->mu); + } + return; + } + + /* Tell the registered callback that data is available to read. */ + GPR_ASSERT(sp->read_cb); + sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg); + + /* Re-arm the notification event so we get another chance to read. */ + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); +} + +static int add_socket_to_server(grpc_udp_server *s, int fd, + const struct sockaddr *addr, int addr_len, + grpc_udp_server_read_cb read_cb) { + server_port *sp; + int port; + char *addr_str; + char *name; + + port = prepare_socket(fd, addr, addr_len); + if (port >= 0) { + grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + gpr_asprintf(&name, "udp-server-listener:%s", addr_str); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb && "must add ports before starting server"); + /* append it to the list under a lock */ + if (s->nports == s->port_capacity) { + s->port_capacity *= 2; + s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + } + sp = &s->ports[s->nports++]; + sp->server = s; + sp->fd = fd; + sp->emfd = grpc_fd_create(fd, name); + memcpy(sp->addr.untyped, addr, addr_len); + sp->addr_len = addr_len; + sp->read_cb = read_cb; + GPR_ASSERT(sp->emfd); + gpr_mu_unlock(&s->mu); + } + + return port; +} + +int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len, + grpc_udp_server_read_cb read_cb) { + int allocated_port1 = -1; + int allocated_port2 = -1; + unsigned i; + int fd; + grpc_dualstack_mode dsmode; + struct sockaddr_in6 addr6_v4mapped; + struct sockaddr_in wild4; + struct sockaddr_in6 wild6; + struct sockaddr_in addr4_copy; + struct sockaddr *allocated_addr = NULL; + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + int port; + + if (((struct sockaddr *)addr)->sa_family == AF_UNIX) { + unlink_if_unix_domain_socket(addr); + } + + /* Check if this is a wildcard port, and if so, try to keep the port the same + as some previously created listener. */ + if (grpc_sockaddr_get_port(addr) == 0) { + for (i = 0; i < s->nports; i++) { + sockname_len = sizeof(sockname_temp); + if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, + &sockname_len)) { + port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + if (port > 0) { + allocated_addr = malloc(addr_len); + memcpy(allocated_addr, addr, addr_len); + grpc_sockaddr_set_port(allocated_addr, port); + addr = allocated_addr; + break; + } + } + } + } + + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = (const struct sockaddr *)&addr6_v4mapped; + addr_len = sizeof(addr6_v4mapped); + } + + /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ + if (grpc_sockaddr_is_wildcard(addr, &port)) { + grpc_sockaddr_make_wildcards(port, &wild4, &wild6); + + /* Try listening on IPv6 first. */ + addr = (struct sockaddr *)&wild6; + addr_len = sizeof(wild6); + fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { + goto done; + } + + /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ + if (port == 0 && allocated_port1 > 0) { + grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1); + } + addr = (struct sockaddr *)&wild4; + addr_len = sizeof(wild4); + } + + fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + } + if (dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = (struct sockaddr *)&addr4_copy; + addr_len = sizeof(addr4_copy); + } + allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + +done: + gpr_free(allocated_addr); + return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; +} + +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { + return (index < s->nports) ? s->ports[index].fd : -1; +} + +void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, + size_t pollset_count, + grpc_udp_server_cb new_transport_cb, void *cb_arg) { + size_t i, j; + GPR_ASSERT(new_transport_cb); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb); + GPR_ASSERT(s->active_ports == 0); + s->cb = new_transport_cb; + s->cb_arg = cb_arg; + s->pollsets = pollsets; + for (i = 0; i < s->nports; i++) { + for (j = 0; j < pollset_count; j++) { + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd); + } + s->ports[i].read_closure.cb = on_read; + s->ports[i].read_closure.cb_arg = &s->ports[i]; + grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure); + s->active_ports++; + } + gpr_mu_unlock(&s->mu); +} + +/* TODO(rjshade): Add a test for this method. */ +void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len, + const struct sockaddr *peer_address) { + int rc; + rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address)); + if (rc < 0) { + gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno)); + } +} + +#endif diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h new file mode 100644 index 0000000000..fcc4ba6e97 --- /dev/null +++ b/src/core/iomgr/udp_server.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H +#define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H + +#include "src/core/iomgr/endpoint.h" + +/* Forward decl of grpc_udp_server */ +typedef struct grpc_udp_server grpc_udp_server; + +/* New server callback: ep is the newly connected connection */ +typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep); + +/* Called when data is available to read from the socket. */ +typedef void (*grpc_udp_server_read_cb)(int fd, + grpc_udp_server_cb new_transport_cb, + void *cb_arg); + +/* Create a server, initially not bound to any ports */ +grpc_udp_server *grpc_udp_server_create(void); + +/* Start listening to bound ports */ +void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets, + size_t pollset_count, grpc_udp_server_cb cb, + void *cb_arg); + +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); + +/* Add a port to the server, returning port number on success, or negative + on failure. + + The :: and 0.0.0.0 wildcard addresses are treated identically, accepting + both IPv4 and IPv6 connections, but :: is the preferred style. This usually + creates one socket, but possibly two on systems which support IPv6, + but not dualstack sockets. */ + +/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle + all of the multiple socket port matching logic in one place */ +int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len, + grpc_udp_server_read_cb read_cb); + +void grpc_udp_server_destroy(grpc_udp_server *server, + void (*shutdown_done)(void *shutdown_done_arg), + void *shutdown_done_arg); + +/* Write the contents of buffer to the underlying UDP socket. */ +/* +void grpc_udp_server_write(grpc_udp_server *s, + const char *buffer, + int buf_len, + const struct sockaddr* to); + */ + +#endif /* GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H */ diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c index 52912235f8..08fdc74f17 100644 --- a/src/core/iomgr/wakeup_fd_eventfd.c +++ b/src/core/iomgr/wakeup_fd_eventfd.c @@ -75,8 +75,7 @@ static int eventfd_check_availability(void) { } const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = { - eventfd_create, eventfd_consume, eventfd_wakeup, eventfd_destroy, - eventfd_check_availability -}; + eventfd_create, eventfd_consume, eventfd_wakeup, eventfd_destroy, + eventfd_check_availability}; #endif /* GPR_LINUX_EVENTFD */ diff --git a/src/core/iomgr/wakeup_fd_nospecial.c b/src/core/iomgr/wakeup_fd_nospecial.c index c1038bf379..78d763c103 100644 --- a/src/core/iomgr/wakeup_fd_nospecial.c +++ b/src/core/iomgr/wakeup_fd_nospecial.c @@ -43,12 +43,9 @@ #include "src/core/iomgr/wakeup_fd_posix.h" #include <stddef.h> -static int check_availability_invalid(void) { - return 0; -} +static int check_availability_invalid(void) { return 0; } const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = { - NULL, NULL, NULL, NULL, check_availability_invalid -}; + NULL, NULL, NULL, NULL, check_availability_invalid}; -#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */ +#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */ diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c index 9fc4ee2388..bd643e8061 100644 --- a/src/core/iomgr/wakeup_fd_pipe.c +++ b/src/core/iomgr/wakeup_fd_pipe.c @@ -94,4 +94,4 @@ const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = { pipe_init, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability}; -#endif /* GPR_POSIX_WAKUP_FD */ +#endif /* GPR_POSIX_WAKUP_FD */ diff --git a/src/core/iomgr/wakeup_fd_pipe.h b/src/core/iomgr/wakeup_fd_pipe.h index aa8f977ddb..01a13a97c0 100644 --- a/src/core/iomgr/wakeup_fd_pipe.h +++ b/src/core/iomgr/wakeup_fd_pipe.h @@ -38,4 +38,4 @@ extern grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable; -#endif /* GRPC_INTERNAL_CORE_IOMGR_WAKEUP_FD_PIPE_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_WAKEUP_FD_PIPE_H */ diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c index e48f5223fa..d09fb78d12 100644 --- a/src/core/iomgr/wakeup_fd_posix.c +++ b/src/core/iomgr/wakeup_fd_posix.c @@ -53,9 +53,7 @@ void grpc_wakeup_fd_global_init_force_fallback(void) { wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable; } -void grpc_wakeup_fd_global_destroy(void) { - wakeup_fd_vtable = NULL; -} +void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { wakeup_fd_vtable->init(fd_info); @@ -73,4 +71,4 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { wakeup_fd_vtable->destroy(fd_info); } -#endif /* GPR_POSIX_WAKEUP_FD */ +#endif /* GPR_POSIX_WAKEUP_FD */ diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h index a4da4df51f..b6c086900d 100644 --- a/src/core/iomgr/wakeup_fd_posix.h +++ b/src/core/iomgr/wakeup_fd_posix.h @@ -96,4 +96,4 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info); * wakeup_fd_nospecial.c if no such implementation exists. */ extern const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable; -#endif /* GRPC_INTERNAL_CORE_IOMGR_WAKEUP_FD_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_WAKEUP_FD_POSIX_H */ diff --git a/src/core/json/json.h b/src/core/json/json.h index cac18ad885..573584bf6f 100644 --- a/src/core/json/json.h +++ b/src/core/json/json.h @@ -85,4 +85,4 @@ char* grpc_json_dump_to_string(grpc_json* json, int indent); grpc_json* grpc_json_create(grpc_json_type type); void grpc_json_destroy(grpc_json* json); -#endif /* GRPC_INTERNAL_CORE_JSON_JSON_H */ +#endif /* GRPC_INTERNAL_CORE_JSON_JSON_H */ diff --git a/src/core/json/json_common.h b/src/core/json/json_common.h index 84bf375916..481695b38b 100644 --- a/src/core/json/json_common.h +++ b/src/core/json/json_common.h @@ -46,4 +46,4 @@ typedef enum { GRPC_JSON_TOP_LEVEL } grpc_json_type; -#endif /* GRPC_INTERNAL_CORE_JSON_JSON_COMMON_H */ +#endif /* GRPC_INTERNAL_CORE_JSON_JSON_COMMON_H */ diff --git a/src/core/json/json_reader.c b/src/core/json/json_reader.c index c14094c290..c22d4edd47 100644 --- a/src/core/json/json_reader.c +++ b/src/core/json/json_reader.c @@ -42,27 +42,26 @@ static void json_reader_string_clear(grpc_json_reader* reader) { } static void json_reader_string_add_char(grpc_json_reader* reader, - gpr_uint32 c) { + gpr_uint32 c) { reader->vtable->string_add_char(reader->userdata, c); } static void json_reader_string_add_utf32(grpc_json_reader* reader, - gpr_uint32 utf32) { + gpr_uint32 utf32) { reader->vtable->string_add_utf32(reader->userdata, utf32); } -static gpr_uint32 - grpc_json_reader_read_char(grpc_json_reader* reader) { +static gpr_uint32 grpc_json_reader_read_char(grpc_json_reader* reader) { return reader->vtable->read_char(reader->userdata); } static void json_reader_container_begins(grpc_json_reader* reader, - grpc_json_type type) { + grpc_json_type type) { reader->vtable->container_begins(reader->userdata, type); } -static grpc_json_type - grpc_json_reader_container_ends(grpc_json_reader* reader) { +static grpc_json_type grpc_json_reader_container_ends( + grpc_json_reader* reader) { return reader->vtable->container_ends(reader->userdata); } @@ -101,8 +100,9 @@ void grpc_json_reader_init(grpc_json_reader* reader, } int grpc_json_reader_is_complete(grpc_json_reader* reader) { - return ((reader->depth == 0) && ((reader->state == GRPC_JSON_STATE_END) || - (reader->state == GRPC_JSON_STATE_VALUE_END))); + return ((reader->depth == 0) && + ((reader->state == GRPC_JSON_STATE_END) || + (reader->state == GRPC_JSON_STATE_VALUE_END))); } grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) { @@ -143,7 +143,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) { case GRPC_JSON_STATE_OBJECT_KEY_STRING: case GRPC_JSON_STATE_VALUE_STRING: if (c != ' ') return GRPC_JSON_PARSE_ERROR; - if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR; + if (reader->unicode_high_surrogate != 0) + return GRPC_JSON_PARSE_ERROR; json_reader_string_add_char(reader, c); break; @@ -169,7 +170,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) { switch (reader->state) { case GRPC_JSON_STATE_OBJECT_KEY_STRING: case GRPC_JSON_STATE_VALUE_STRING: - if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR; + if (reader->unicode_high_surrogate != 0) + return GRPC_JSON_PARSE_ERROR; json_reader_string_add_char(reader, c); break; @@ -253,7 +255,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) { /* This is the \\ case. */ case GRPC_JSON_STATE_STRING_ESCAPE: - if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR; + if (reader->unicode_high_surrogate != 0) + return GRPC_JSON_PARSE_ERROR; json_reader_string_add_char(reader, '\\'); if (reader->escaped_string_was_key) { reader->state = GRPC_JSON_STATE_OBJECT_KEY_STRING; @@ -276,7 +279,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) { break; case GRPC_JSON_STATE_OBJECT_KEY_STRING: - if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR; + if (reader->unicode_high_surrogate != 0) + return GRPC_JSON_PARSE_ERROR; if (c == '"') { reader->state = GRPC_JSON_STATE_OBJECT_KEY_END; json_reader_set_key(reader); @@ -288,7 +292,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) { break; case GRPC_JSON_STATE_VALUE_STRING: - if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR; + if (reader->unicode_high_surrogate != 0) + return GRPC_JSON_PARSE_ERROR; if (c == '"') { reader->state = GRPC_JSON_STATE_VALUE_END; json_reader_set_string(reader); @@ -438,7 +443,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) { if (reader->unicode_high_surrogate == 0) return GRPC_JSON_PARSE_ERROR; utf32 = 0x10000; - utf32 += (gpr_uint32)((reader->unicode_high_surrogate - 0xd800) * 0x400); + utf32 += (gpr_uint32)( + (reader->unicode_high_surrogate - 0xd800) * 0x400); utf32 += (gpr_uint32)(reader->unicode_char - 0xdc00); json_reader_string_add_utf32(reader, utf32); reader->unicode_high_surrogate = 0; diff --git a/src/core/json/json_reader.h b/src/core/json/json_reader.h index b1a5ace8fb..4d5487f790 100644 --- a/src/core/json/json_reader.h +++ b/src/core/json/json_reader.h @@ -157,4 +157,4 @@ void grpc_json_reader_init(grpc_json_reader* reader, */ int grpc_json_reader_is_complete(grpc_json_reader* reader); -#endif /* GRPC_INTERNAL_CORE_JSON_JSON_READER_H */ +#endif /* GRPC_INTERNAL_CORE_JSON_JSON_READER_H */ diff --git a/src/core/json/json_string.c b/src/core/json/json_string.c index 03c1099167..e6622ec461 100644 --- a/src/core/json/json_string.c +++ b/src/core/json/json_string.c @@ -73,7 +73,6 @@ typedef struct { size_t allocated; } json_writer_userdata; - /* This function checks if there's enough space left in the output buffer, * and will enlarge it if necessary. We're only allocating chunks of 256 * bytes at a time (or multiples thereof). @@ -97,8 +96,8 @@ static void json_writer_output_char(void* userdata, char c) { state->free_space--; } -static void json_writer_output_string_with_len(void* userdata, - const char* str, size_t len) { +static void json_writer_output_string_with_len(void* userdata, const char* str, + size_t len) { json_writer_userdata* state = userdata; json_writer_output_check(userdata, len); memcpy(state->output + state->string_len, str, len); @@ -106,8 +105,7 @@ static void json_writer_output_string_with_len(void* userdata, state->free_space -= len; } -static void json_writer_output_string(void* userdata, - const char* str) { +static void json_writer_output_string(void* userdata, const char* str) { size_t len = strlen(str); json_writer_output_string_with_len(userdata, str, len); } @@ -184,8 +182,7 @@ static gpr_uint32 json_reader_read_char(void* userdata) { /* Helper function to create a new grpc_json object and link it into * our tree-in-progress inside our opaque structure. */ -static grpc_json* json_create_and_link(void* userdata, - grpc_json_type type) { +static grpc_json* json_create_and_link(void* userdata, grpc_json_type type) { json_reader_userdata* state = userdata; grpc_json* json = grpc_json_create(type); @@ -201,7 +198,7 @@ static grpc_json* json_create_and_link(void* userdata, json->parent->child = json; } if (json->parent->type == GRPC_JSON_OBJECT) { - json->key = (char*) state->key; + json->key = (char*)state->key; } } if (!state->top) { @@ -261,13 +258,13 @@ static void json_reader_set_key(void* userdata) { static void json_reader_set_string(void* userdata) { json_reader_userdata* state = userdata; grpc_json* json = json_create_and_link(userdata, GRPC_JSON_STRING); - json->value = (char*) state->string; + json->value = (char*)state->string; } static int json_reader_set_number(void* userdata) { json_reader_userdata* state = userdata; grpc_json* json = json_create_and_link(userdata, GRPC_JSON_NUMBER); - json->value = (char*) state->string; + json->value = (char*)state->string; return 1; } @@ -287,32 +284,25 @@ static void json_reader_set_null(void* userdata) { } static grpc_json_reader_vtable reader_vtable = { - json_reader_string_clear, - json_reader_string_add_char, - json_reader_string_add_utf32, - json_reader_read_char, - json_reader_container_begins, - json_reader_container_ends, - json_reader_set_key, - json_reader_set_string, - json_reader_set_number, - json_reader_set_true, - json_reader_set_false, - json_reader_set_null -}; + json_reader_string_clear, json_reader_string_add_char, + json_reader_string_add_utf32, json_reader_read_char, + json_reader_container_begins, json_reader_container_ends, + json_reader_set_key, json_reader_set_string, + json_reader_set_number, json_reader_set_true, + json_reader_set_false, json_reader_set_null}; /* And finally, let's define our public API. */ grpc_json* grpc_json_parse_string_with_len(char* input, size_t size) { grpc_json_reader reader; json_reader_userdata state; - grpc_json *json = NULL; + grpc_json* json = NULL; grpc_json_reader_status status; if (!input) return NULL; state.top = state.current_container = state.current_value = NULL; state.string = state.key = NULL; - state.string_ptr = state.input = (gpr_uint8*) input; + state.string_ptr = state.input = (gpr_uint8*)input; state.remaining_input = size; grpc_json_reader_init(&reader, &reader_vtable, &state); @@ -333,8 +323,8 @@ grpc_json* grpc_json_parse_string(char* input) { return grpc_json_parse_string_with_len(input, UNBOUND_JSON_STRING_LENGTH); } -static void json_dump_recursive(grpc_json_writer* writer, - grpc_json* json, int in_object) { +static void json_dump_recursive(grpc_json_writer* writer, grpc_json* json, + int in_object) { while (json) { if (in_object) grpc_json_writer_object_key(writer, json->key); @@ -370,10 +360,8 @@ static void json_dump_recursive(grpc_json_writer* writer, } static grpc_json_writer_vtable writer_vtable = { - json_writer_output_char, - json_writer_output_string, - json_writer_output_string_with_len -}; + json_writer_output_char, json_writer_output_string, + json_writer_output_string_with_len}; char* grpc_json_dump_to_string(grpc_json* json, int indent) { grpc_json_writer writer; diff --git a/src/core/json/json_writer.c b/src/core/json/json_writer.c index bed9a9bfa5..ca9c835825 100644 --- a/src/core/json/json_writer.c +++ b/src/core/json/json_writer.c @@ -41,11 +41,13 @@ static void json_writer_output_char(grpc_json_writer* writer, char c) { writer->vtable->output_char(writer->userdata, c); } -static void json_writer_output_string(grpc_json_writer* writer, const char* str) { +static void json_writer_output_string(grpc_json_writer* writer, + const char* str) { writer->vtable->output_string(writer->userdata, str); } -static void json_writer_output_string_with_len(grpc_json_writer* writer, const char* str, size_t len) { +static void json_writer_output_string_with_len(grpc_json_writer* writer, + const char* str, size_t len) { writer->vtable->output_string_with_len(writer->userdata, str, len); } @@ -58,8 +60,7 @@ void grpc_json_writer_init(grpc_json_writer* writer, int indent, writer->userdata = userdata; } -static void json_writer_output_indent( - grpc_json_writer* writer) { +static void json_writer_output_indent(grpc_json_writer* writer) { static const char spacesstr[] = " " " " @@ -99,14 +100,15 @@ static void json_writer_value_end(grpc_json_writer* writer) { } } -static void json_writer_escape_utf16(grpc_json_writer* writer, gpr_uint16 utf16) { +static void json_writer_escape_utf16(grpc_json_writer* writer, + gpr_uint16 utf16) { static const char hex[] = "0123456789abcdef"; json_writer_output_string_with_len(writer, "\\u", 2); json_writer_output_char(writer, hex[(utf16 >> 12) & 0x0f]); json_writer_output_char(writer, hex[(utf16 >> 8) & 0x0f]); json_writer_output_char(writer, hex[(utf16 >> 4) & 0x0f]); - json_writer_output_char(writer, hex[(utf16) & 0x0f]); + json_writer_output_char(writer, hex[(utf16)&0x0f]); } static void json_writer_escape_string(grpc_json_writer* writer, @@ -173,8 +175,8 @@ static void json_writer_escape_string(grpc_json_writer* writer, * Any other range is technically reserved for future usage, so if we * don't want the software to break in the future, we have to allow * anything else. The first non-unicode character is 0x110000. */ - if (((utf32 >= 0xd800) && (utf32 <= 0xdfff)) || - (utf32 >= 0x110000)) break; + if (((utf32 >= 0xd800) && (utf32 <= 0xdfff)) || (utf32 >= 0x110000)) + break; if (utf32 >= 0x10000) { /* If utf32 contains a character that is above 0xffff, it needs to be * broken down into a utf-16 surrogate pair. A surrogate pair is first @@ -194,7 +196,8 @@ static void json_writer_escape_string(grpc_json_writer* writer, */ utf32 -= 0x10000; json_writer_escape_utf16(writer, (gpr_uint16)(0xd800 | (utf32 >> 10))); - json_writer_escape_utf16(writer, (gpr_uint16)(0xdc00 | (utf32 & 0x3ff))); + json_writer_escape_utf16(writer, + (gpr_uint16)(0xdc00 | (utf32 & 0x3ff))); } else { json_writer_escape_utf16(writer, (gpr_uint16)utf32); } @@ -204,7 +207,8 @@ static void json_writer_escape_string(grpc_json_writer* writer, json_writer_output_char(writer, '"'); } -void grpc_json_writer_container_begins(grpc_json_writer* writer, grpc_json_type type) { +void grpc_json_writer_container_begins(grpc_json_writer* writer, + grpc_json_type type) { if (!writer->got_key) json_writer_value_end(writer); json_writer_output_indent(writer); json_writer_output_char(writer, type == GRPC_JSON_OBJECT ? '{' : '['); @@ -213,7 +217,8 @@ void grpc_json_writer_container_begins(grpc_json_writer* writer, grpc_json_type writer->depth++; } -void grpc_json_writer_container_ends(grpc_json_writer* writer, grpc_json_type type) { +void grpc_json_writer_container_ends(grpc_json_writer* writer, + grpc_json_type type) { if (writer->indent && !writer->container_empty) json_writer_output_char(writer, '\n'); writer->depth--; @@ -238,14 +243,16 @@ void grpc_json_writer_value_raw(grpc_json_writer* writer, const char* string) { writer->got_key = 0; } -void grpc_json_writer_value_raw_with_len(grpc_json_writer* writer, const char* string, size_t len) { +void grpc_json_writer_value_raw_with_len(grpc_json_writer* writer, + const char* string, size_t len) { if (!writer->got_key) json_writer_value_end(writer); json_writer_output_indent(writer); json_writer_output_string_with_len(writer, string, len); writer->got_key = 0; } -void grpc_json_writer_value_string(grpc_json_writer* writer, const char* string) { +void grpc_json_writer_value_string(grpc_json_writer* writer, + const char* string) { if (!writer->got_key) json_writer_value_end(writer); json_writer_output_indent(writer); json_writer_escape_string(writer, string); diff --git a/src/core/json/json_writer.h b/src/core/json/json_writer.h index dfa61a5fef..a299dfabf8 100644 --- a/src/core/json/json_writer.h +++ b/src/core/json/json_writer.h @@ -78,16 +78,20 @@ void grpc_json_writer_init(grpc_json_writer* writer, int indent, grpc_json_writer_vtable* vtable, void* userdata); /* Signals the beginning of a container. */ -void grpc_json_writer_container_begins(grpc_json_writer* writer, grpc_json_type type); +void grpc_json_writer_container_begins(grpc_json_writer* writer, + grpc_json_type type); /* Signals the end of a container. */ -void grpc_json_writer_container_ends(grpc_json_writer* writer, grpc_json_type type); +void grpc_json_writer_container_ends(grpc_json_writer* writer, + grpc_json_type type); /* Writes down an object key for the next value. */ void grpc_json_writer_object_key(grpc_json_writer* writer, const char* string); /* Sets a raw value. Useful for numbers. */ void grpc_json_writer_value_raw(grpc_json_writer* writer, const char* string); /* Sets a raw value with its length. Useful for values like true or false. */ -void grpc_json_writer_value_raw_with_len(grpc_json_writer* writer, const char* string, size_t len); +void grpc_json_writer_value_raw_with_len(grpc_json_writer* writer, + const char* string, size_t len); /* Sets a string value. It'll be escaped, and utf-8 validated. */ -void grpc_json_writer_value_string(grpc_json_writer* writer, const char* string); +void grpc_json_writer_value_string(grpc_json_writer* writer, + const char* string); -#endif /* GRPC_INTERNAL_CORE_JSON_JSON_WRITER_H */ +#endif /* GRPC_INTERNAL_CORE_JSON_JSON_WRITER_H */ diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h index 036d02f187..92dbab9042 100644 --- a/src/core/profiling/timers.h +++ b/src/core/profiling/timers.h @@ -88,7 +88,7 @@ enum grpc_profiling_tags { } while (0) #define GRPC_TIMER_IMPORTANT_MARK(tag, id) \ - do { \ + do { \ } while (0) #define GRPC_TIMER_BEGIN(tag, id) \ diff --git a/src/core/security/auth_filters.h b/src/core/security/auth_filters.h index ff921690e0..c179b54bec 100644 --- a/src/core/security/auth_filters.h +++ b/src/core/security/auth_filters.h @@ -39,4 +39,4 @@ extern const grpc_channel_filter grpc_client_auth_filter; extern const grpc_channel_filter grpc_server_auth_filter; -#endif /* GRPC_INTERNAL_CORE_SECURITY_AUTH_FILTERS_H */ +#endif /* GRPC_INTERNAL_CORE_SECURITY_AUTH_FILTERS_H */ diff --git a/src/core/security/base64.h b/src/core/security/base64.h index b9abc07b52..31ae982691 100644 --- a/src/core/security/base64.h +++ b/src/core/security/base64.h @@ -49,4 +49,4 @@ gpr_slice grpc_base64_decode(const char *b64, int url_safe); gpr_slice grpc_base64_decode_with_len(const char *b64, size_t b64_len, int url_safe); -#endif /* GRPC_INTERNAL_CORE_SECURITY_BASE64_H */ +#endif /* GRPC_INTERNAL_CORE_SECURITY_BASE64_H */ diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 410852da52..8e63978b82 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -200,7 +200,7 @@ static void auth_start_transport_op(grpc_call_element *elem, channel_data *chand = elem->channel_data; grpc_linked_mdelem *l; size_t i; - grpc_client_security_context* sec_ctx = NULL; + grpc_client_security_context *sec_ctx = NULL; if (calld->security_context_set == 0) { calld->security_context_set = 1; @@ -316,9 +316,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, (grpc_channel_security_connector *)GRPC_SECURITY_CONNECTOR_REF( sc, "client_auth_filter"); chand->md_ctx = metadata_context; - chand->authority_string = grpc_mdstr_from_string(chand->md_ctx, ":authority", 0); + chand->authority_string = + grpc_mdstr_from_string(chand->md_ctx, ":authority", 0); chand->path_string = grpc_mdstr_from_string(chand->md_ctx, ":path", 0); - chand->error_msg_key = grpc_mdstr_from_string(chand->md_ctx, "grpc-message", 0); + chand->error_msg_key = + grpc_mdstr_from_string(chand->md_ctx, "grpc-message", 0); chand->status_key = grpc_mdstr_from_string(chand->md_ctx, "grpc-status", 0); } diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 6421ce673d..8852cab3e7 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -793,16 +793,16 @@ void on_simulated_token_fetch_done(void *user_data, int success) { (grpc_credentials_metadata_request *)user_data; grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds; GPR_ASSERT(success); - r->cb(r->user_data, c->md_store->entries, - c->md_store->num_entries, GRPC_CREDENTIALS_OK); + r->cb(r->user_data, c->md_store->entries, c->md_store->num_entries, + GRPC_CREDENTIALS_OK); grpc_credentials_metadata_request_destroy(r); } static void md_only_test_get_request_metadata(grpc_credentials *creds, - grpc_pollset *pollset, - const char *service_url, - grpc_credentials_metadata_cb cb, - void *user_data) { + grpc_pollset *pollset, + const char *service_url, + grpc_credentials_metadata_cb cb, + void *user_data) { grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds; if (c->is_async) { @@ -854,10 +854,10 @@ static int access_token_has_request_metadata_only( } static void access_token_get_request_metadata(grpc_credentials *creds, - grpc_pollset *pollset, - const char *service_url, - grpc_credentials_metadata_cb cb, - void *user_data) { + grpc_pollset *pollset, + const char *service_url, + grpc_credentials_metadata_cb cb, + void *user_data) { grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds; cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK); } diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index 04736525dc..29cd1ac87f 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -192,8 +192,9 @@ void grpc_flush_cached_google_default_credentials(void); /* Metadata-only credentials with the specified key and value where asynchronicity can be simulated for testing. */ -grpc_credentials *grpc_md_only_test_credentials_create( - const char *md_key, const char *md_value, int is_async); +grpc_credentials *grpc_md_only_test_credentials_create(const char *md_key, + const char *md_value, + int is_async); /* Private constructor for jwt credentials from an already parsed json key. Takes ownership of the key. */ diff --git a/src/core/security/credentials_metadata.c b/src/core/security/credentials_metadata.c index 22c786be56..b8a132f1ea 100644 --- a/src/core/security/credentials_metadata.c +++ b/src/core/security/credentials_metadata.c @@ -47,7 +47,8 @@ static void store_ensure_capacity(grpc_credentials_md_store *store) { grpc_credentials_md_store *grpc_credentials_md_store_create( size_t initial_capacity) { - grpc_credentials_md_store *store = gpr_malloc(sizeof(grpc_credentials_md_store)); + grpc_credentials_md_store *store = + gpr_malloc(sizeof(grpc_credentials_md_store)); memset(store, 0, sizeof(grpc_credentials_md_store)); if (initial_capacity > 0) { store->entries = gpr_malloc(initial_capacity * sizeof(grpc_credentials_md)); @@ -98,4 +99,3 @@ void grpc_credentials_md_store_unref(grpc_credentials_md_store *store) { gpr_free(store); } } - diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index d1f228665f..3631de867a 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(void) { gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); while (!detector.is_done) { grpc_pollset_worker worker; - grpc_pollset_work(&detector.pollset, &worker, + grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); @@ -203,8 +203,8 @@ end: /* Blend with default ssl credentials and add a global reference so that it can be cached and re-served. */ grpc_credentials *ssl_creds = grpc_ssl_credentials_create(NULL, NULL); - default_credentials = grpc_credentials_ref(grpc_composite_credentials_create( - ssl_creds, result)); + default_credentials = grpc_credentials_ref( + grpc_composite_credentials_create(ssl_creds, result)); GPR_ASSERT(default_credentials != NULL); grpc_credentials_unref(ssl_creds); grpc_credentials_unref(result); diff --git a/src/core/security/json_token.h b/src/core/security/json_token.h index 091dfefb6e..7e06864ff3 100644 --- a/src/core/security/json_token.h +++ b/src/core/security/json_token.h @@ -115,4 +115,4 @@ grpc_auth_refresh_token grpc_auth_refresh_token_create_from_json( /* Destructs the object. */ void grpc_auth_refresh_token_destruct(grpc_auth_refresh_token *refresh_token); -#endif /* GRPC_INTERNAL_CORE_SECURITY_JSON_TOKEN_H */ +#endif /* GRPC_INTERNAL_CORE_SECURITY_JSON_TOKEN_H */ diff --git a/src/core/security/jwt_verifier.h b/src/core/security/jwt_verifier.h index 8077e24883..7a32debfcb 100644 --- a/src/core/security/jwt_verifier.h +++ b/src/core/security/jwt_verifier.h @@ -133,4 +133,3 @@ grpc_jwt_verifier_status grpc_jwt_claims_check(const grpc_jwt_claims *claims, const char *audience); #endif /* GRPC_INTERNAL_CORE_SECURITY_JWT_VERIFIER_H */ - diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 95fbf71f3d..81b3e33cb2 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -332,7 +332,7 @@ static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, } static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set) { secure_endpoint *ep = (secure_endpoint *)secure_ep; grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set); } diff --git a/src/core/security/secure_endpoint.h b/src/core/security/secure_endpoint.h index 93c29b5111..c563bdd9c5 100644 --- a/src/core/security/secure_endpoint.h +++ b/src/core/security/secure_endpoint.h @@ -46,4 +46,4 @@ grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *to_wrap, gpr_slice *leftover_slices, size_t leftover_nslices); -#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_ENDPOINT_H */ +#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_ENDPOINT_H */ diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h index 29025f5236..d9b802556d 100644 --- a/src/core/security/secure_transport_setup.h +++ b/src/core/security/secure_transport_setup.h @@ -50,4 +50,4 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, grpc_secure_transport_setup_done_cb cb, void *user_data); -#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H */ +#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H */ diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index a354536dcd..ba9ac68c5f 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -575,6 +575,16 @@ grpc_security_status grpc_ssl_channel_security_connector_create( if (!check_request_metadata_creds(request_metadata_creds)) { goto error; } + if (config->pem_root_certs == NULL) { + pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs); + if (pem_root_certs == NULL || pem_root_certs_size == 0) { + gpr_log(GPR_ERROR, "Could not get default pem root certs."); + goto error; + } + } else { + pem_root_certs = config->pem_root_certs; + pem_root_certs_size = config->pem_root_certs_size; + } c = gpr_malloc(sizeof(grpc_ssl_channel_security_connector)); memset(c, 0, sizeof(grpc_ssl_channel_security_connector)); @@ -590,16 +600,6 @@ grpc_security_status grpc_ssl_channel_security_connector_create( if (overridden_target_name != NULL) { c->overridden_target_name = gpr_strdup(overridden_target_name); } - if (config->pem_root_certs == NULL) { - pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs); - if (pem_root_certs == NULL || pem_root_certs_size == 0) { - gpr_log(GPR_ERROR, "Could not get default pem root certs."); - goto error; - } - } else { - pem_root_certs = config->pem_root_certs; - pem_root_certs_size = config->pem_root_certs_size; - } result = tsi_create_ssl_client_handshaker_factory( config->pem_private_key, config->pem_private_key_size, config->pem_cert_chain, config->pem_cert_chain_size, pem_root_certs, diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c index 1ef0fc9255..c1b434f302 100644 --- a/src/core/security/security_context.c +++ b/src/core/security/security_context.c @@ -204,8 +204,7 @@ int grpc_auth_context_set_peer_identity_property_name(grpc_auth_context *ctx, return 1; } -int grpc_auth_context_peer_is_authenticated( - const grpc_auth_context *ctx) { +int grpc_auth_context_peer_is_authenticated(const grpc_auth_context *ctx) { return ctx->peer_identity_property_name == NULL ? 0 : 1; } @@ -326,4 +325,3 @@ grpc_auth_metadata_processor *grpc_find_auth_metadata_processor_in_args( } return NULL; } - diff --git a/src/core/security/security_context.h b/src/core/security/security_context.h index 7fcd438cf6..a9a0306410 100644 --- a/src/core/security/security_context.h +++ b/src/core/security/security_context.h @@ -112,5 +112,4 @@ grpc_auth_metadata_processor *grpc_auth_metadata_processor_from_arg( grpc_auth_metadata_processor *grpc_find_auth_metadata_processor_in_args( const grpc_channel_args *args); -#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H */ - +#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H */ diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 98d7788c83..57729be32c 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -105,24 +105,34 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) { return md; } -static void on_md_processing_done(void *user_data, - const grpc_metadata *consumed_md, - size_t num_consumed_md, int success) { +static void on_md_processing_done( + void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, + const grpc_metadata *response_md, size_t num_response_md, + grpc_status_code status, const char *error_details) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (success) { + /* TODO(jboeuf): Implement support for response_md. */ + if (response_md != NULL && num_response_md > 0) { + gpr_log(GPR_INFO, + "response_md in auth metadata processing not supported for now. " + "Ignoring..."); + } + + if (status == GRPC_STATUS_OK) { calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md, elem); - calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1); } else { - gpr_slice message = gpr_slice_from_copied_string( - "Authentication metadata processing failed."); + gpr_slice message; + error_details = error_details != NULL + ? error_details + : "Authentication metadata processing failed."; + message = gpr_slice_from_copied_string(error_details); grpc_sopb_reset(calld->recv_ops); - grpc_transport_stream_op_add_close(&calld->transport_op, - GRPC_STATUS_UNAUTHENTICATED, &message); + grpc_transport_stream_op_add_close(&calld->transport_op, status, &message); grpc_call_next_op(elem, &calld->transport_op); } grpc_metadata_array_destroy(&calld->md); @@ -212,8 +222,7 @@ static void init_call_elem(grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { -} +static void destroy_call_elem(grpc_call_element *elem) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, diff --git a/src/core/statistics/census_interface.h b/src/core/statistics/census_interface.h index eb4349c311..ac1ff24866 100644 --- a/src/core/statistics/census_interface.h +++ b/src/core/statistics/census_interface.h @@ -73,4 +73,4 @@ census_op_id census_tracing_start_op(void); /* Ends tracing. Calling this function will invalidate the input op_id. */ void census_tracing_end_op(census_op_id op_id); -#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_INTERFACE_H */ +#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_INTERFACE_H */ diff --git a/src/core/statistics/census_log.h b/src/core/statistics/census_log.h index 06869b7a33..60b6d597df 100644 --- a/src/core/statistics/census_log.h +++ b/src/core/statistics/census_log.h @@ -88,4 +88,4 @@ size_t census_log_remaining_space(void); out-of-space. */ int census_log_out_of_space_count(void); -#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_LOG_H */ +#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_LOG_H */ diff --git a/src/core/statistics/census_rpc_stats.c b/src/core/statistics/census_rpc_stats.c index 3e571b1143..b836987cf0 100644 --- a/src/core/statistics/census_rpc_stats.c +++ b/src/core/statistics/census_rpc_stats.c @@ -85,8 +85,8 @@ static void delete_key(void* key) { gpr_free(key); } static const census_ht_option ht_opt = { CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */, - simple_hash /* hash function */, cmp_str_keys /* key comparator */, - delete_stats /* data deleter */, delete_key /* key deleter */ + simple_hash /* hash function */, cmp_str_keys /* key comparator */, + delete_stats /* data deleter */, delete_key /* key deleter */ }; static void init_rpc_stats(void* stats) { diff --git a/src/core/statistics/census_rpc_stats.h b/src/core/statistics/census_rpc_stats.h index 9336dce1f8..aec31c1971 100644 --- a/src/core/statistics/census_rpc_stats.h +++ b/src/core/statistics/census_rpc_stats.h @@ -98,4 +98,4 @@ void census_stats_store_shutdown(void); } #endif -#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_RPC_STATS_H */ +#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_RPC_STATS_H */ diff --git a/src/core/statistics/census_tracing.c b/src/core/statistics/census_tracing.c index 3036ba5407..f2a09dc06e 100644 --- a/src/core/statistics/census_tracing.c +++ b/src/core/statistics/census_tracing.c @@ -60,8 +60,11 @@ static void delete_trace_obj(void* obj) { } static const census_ht_option ht_opt = { - CENSUS_HT_UINT64 /* key type*/, 571 /* n_of_buckets */, NULL /* hash */, - NULL /* compare_keys */, delete_trace_obj /* delete data */, + CENSUS_HT_UINT64 /* key type*/, + 571 /* n_of_buckets */, + NULL /* hash */, + NULL /* compare_keys */, + delete_trace_obj /* delete data */, NULL /* delete key */ }; diff --git a/src/core/statistics/census_tracing.h b/src/core/statistics/census_tracing.h index a4494b510c..08305c2469 100644 --- a/src/core/statistics/census_tracing.h +++ b/src/core/statistics/census_tracing.h @@ -93,4 +93,4 @@ census_trace_obj** census_get_active_ops(int* num_active_ops); } #endif -#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_TRACING_H */ +#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_TRACING_H */ diff --git a/src/core/statistics/hash_table.h b/src/core/statistics/hash_table.h index 7bcb4bcd9b..b7f8e11af4 100644 --- a/src/core/statistics/hash_table.h +++ b/src/core/statistics/hash_table.h @@ -128,4 +128,4 @@ typedef void (*census_ht_itr_cb)(census_ht_key key, const void* val_ptr, should not invalidate data entries. */ gpr_uint64 census_ht_for_all(const census_ht* ht, census_ht_itr_cb); -#endif /* GRPC_INTERNAL_CORE_STATISTICS_HASH_TABLE_H */ +#endif /* GRPC_INTERNAL_CORE_STATISTICS_HASH_TABLE_H */ diff --git a/src/core/support/cpu_iphone.c b/src/core/support/cpu_iphone.c index d412a6d7ee..82b49b47bc 100644 --- a/src/core/support/cpu_iphone.c +++ b/src/core/support/cpu_iphone.c @@ -36,9 +36,7 @@ #ifdef GPR_CPU_IPHONE /* Probably 2 instead of 1, but see comment on gpr_cpu_current_cpu. */ -unsigned gpr_cpu_num_cores(void) { - return 1; -} +unsigned gpr_cpu_num_cores(void) { return 1; } /* Most code that's using this is using it to shard across work queues. So unless profiling shows it's a problem or there appears a way to detect the @@ -46,8 +44,6 @@ unsigned gpr_cpu_num_cores(void) { Note that the interface in cpu.h lets gpr_cpu_num_cores return 0, but doing it makes it impossible for gpr_cpu_current_cpu to satisfy its stated range, and some code might be relying on it. */ -unsigned gpr_cpu_current_cpu(void) { - return 0; -} +unsigned gpr_cpu_current_cpu(void) { return 0; } #endif /* GPR_CPU_IPHONE */ diff --git a/src/core/support/cpu_linux.c b/src/core/support/cpu_linux.c index 282d4daab1..7af6a8f009 100644 --- a/src/core/support/cpu_linux.c +++ b/src/core/support/cpu_linux.c @@ -33,7 +33,7 @@ #ifndef _GNU_SOURCE #define _GNU_SOURCE -#endif /* _GNU_SOURCE */ +#endif /* _GNU_SOURCE */ #include <grpc/support/port_platform.h> diff --git a/src/core/support/env.h b/src/core/support/env.h index 4f2e394d14..24172d8673 100644 --- a/src/core/support/env.h +++ b/src/core/support/env.h @@ -57,4 +57,4 @@ void gpr_setenv(const char *name, const char *value); } #endif -#endif /* GRPC_INTERNAL_CORE_SUPPORT_ENV_H */ +#endif /* GRPC_INTERNAL_CORE_SUPPORT_ENV_H */ diff --git a/src/core/support/file.h b/src/core/support/file.h index 1dafe390e3..d8b7cea44f 100644 --- a/src/core/support/file.h +++ b/src/core/support/file.h @@ -60,4 +60,4 @@ FILE *gpr_tmpfile(const char *prefix, char **tmp_filename); } #endif -#endif /* GRPC_INTERNAL_CORE_SUPPORT_FILE_H */ +#endif /* GRPC_INTERNAL_CORE_SUPPORT_FILE_H */ diff --git a/src/core/support/histogram.c b/src/core/support/histogram.c index 9029703891..78dbf98684 100644 --- a/src/core/support/histogram.c +++ b/src/core/support/histogram.c @@ -191,15 +191,18 @@ static double threshold_for_count_below(gpr_histogram *h, double count_below) { break; } } - return (bucket_start(h, (double)lower_idx) + bucket_start(h, (double)upper_idx)) / 2.0; + return (bucket_start(h, (double)lower_idx) + + bucket_start(h, (double)upper_idx)) / + 2.0; } else { /* treat values as uniform throughout the bucket, and find where this value should lie */ lower_bound = bucket_start(h, (double)lower_idx); upper_bound = bucket_start(h, (double)(lower_idx + 1)); - return GPR_CLAMP(upper_bound - (upper_bound - lower_bound) * - (count_so_far - count_below) / - h->buckets[lower_idx], + return GPR_CLAMP(upper_bound - + (upper_bound - lower_bound) * + (count_so_far - count_below) / + h->buckets[lower_idx], h->min_seen, h->max_seen); } } diff --git a/src/core/support/log_linux.c b/src/core/support/log_linux.c index 5ac36e7b95..02f64d8b7e 100644 --- a/src/core/support/log_linux.c +++ b/src/core/support/log_linux.c @@ -93,8 +93,8 @@ void gpr_default_log(gpr_log_func_args *args) { } gpr_asprintf(&prefix, "%s%s.%09d %7tu %s:%d]", - gpr_log_severity_string(args->severity), time_buffer, - (int)(now.tv_nsec), gettid(), display_file, args->line); + gpr_log_severity_string(args->severity), time_buffer, + (int)(now.tv_nsec), gettid(), display_file, args->line); fprintf(stderr, "%-60s %s\n", prefix, args->message); gpr_free(prefix); diff --git a/src/core/support/murmur_hash.h b/src/core/support/murmur_hash.h index 85ab2fe4bf..343fcb99f7 100644 --- a/src/core/support/murmur_hash.h +++ b/src/core/support/murmur_hash.h @@ -41,4 +41,4 @@ /* compute the hash of key (length len) */ gpr_uint32 gpr_murmur_hash3(const void *key, size_t len, gpr_uint32 seed); -#endif /* GRPC_INTERNAL_CORE_SUPPORT_MURMUR_HASH_H */ +#endif /* GRPC_INTERNAL_CORE_SUPPORT_MURMUR_HASH_H */ diff --git a/src/core/support/slice.c b/src/core/support/slice.c index e4196a48c6..53024e88f1 100644 --- a/src/core/support/slice.c +++ b/src/core/support/slice.c @@ -284,7 +284,8 @@ gpr_slice gpr_slice_split_head(gpr_slice *source, size_t split) { head.refcount = NULL; head.data.inlined.length = (gpr_uint8)split; memcpy(head.data.inlined.bytes, source->data.inlined.bytes, split); - source->data.inlined.length = (gpr_uint8)(source->data.inlined.length - split); + source->data.inlined.length = + (gpr_uint8)(source->data.inlined.length - split); memmove(source->data.inlined.bytes, source->data.inlined.bytes + split, source->data.inlined.length); } else if (split < sizeof(head.data.inlined.bytes)) { diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index 6e6c72a2bf..987d5cb9b5 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -116,7 +116,8 @@ void gpr_slice_buffer_add(gpr_slice_buffer *sb, gpr_slice s) { GPR_SLICE_INLINED_SIZE) { memcpy(back->data.inlined.bytes + back->data.inlined.length, s.data.inlined.bytes, s.data.inlined.length); - back->data.inlined.length = (gpr_uint8)(back->data.inlined.length + s.data.inlined.length); + back->data.inlined.length = + (gpr_uint8)(back->data.inlined.length + s.data.inlined.length); } else { size_t cp1 = GPR_SLICE_INLINED_SIZE - back->data.inlined.length; memcpy(back->data.inlined.bytes + back->data.inlined.length, diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c index bc741f8c70..27ecf62280 100644 --- a/src/core/support/stack_lockfree.c +++ b/src/core/support/stack_lockfree.c @@ -67,7 +67,7 @@ typedef union lockfree_node { #define ENTRY_ALIGNMENT_BITS 3 /* make sure that entries aligned to 8-bytes */ #define INVALID_ENTRY_INDEX \ ((1 << 16) - 1) /* reserve this entry as invalid \ - */ + */ struct gpr_stack_lockfree { lockfree_node *entries; @@ -75,7 +75,7 @@ struct gpr_stack_lockfree { #ifndef NDEBUG /* Bitmap of pushed entries to check for double-push or pop */ - gpr_atm pushed[(INVALID_ENTRY_INDEX+1)/(8*sizeof(gpr_atm))]; + gpr_atm pushed[(INVALID_ENTRY_INDEX + 1) / (8 * sizeof(gpr_atm))]; #endif }; @@ -123,13 +123,13 @@ int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { #ifndef NDEBUG /* Check for double push */ { - int pushed_index = entry / (8*sizeof(gpr_atm)); - int pushed_bit = entry % (8*sizeof(gpr_atm)); + int pushed_index = entry / (8 * sizeof(gpr_atm)); + int pushed_bit = entry % (8 * sizeof(gpr_atm)); gpr_atm old_val; old_val = gpr_atm_no_barrier_fetch_add(&stack->pushed[pushed_index], - (gpr_atm)(1UL << pushed_bit)); - GPR_ASSERT((old_val & (1UL<<pushed_bit)) == 0); + (gpr_atm)(1UL << pushed_bit)); + GPR_ASSERT((old_val & (1UL << pushed_bit)) == 0); } #endif @@ -161,13 +161,13 @@ int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { #ifndef NDEBUG /* Check for valid pop */ { - int pushed_index = head.contents.index / (8*sizeof(gpr_atm)); - int pushed_bit = head.contents.index % (8*sizeof(gpr_atm)); + int pushed_index = head.contents.index / (8 * sizeof(gpr_atm)); + int pushed_bit = head.contents.index % (8 * sizeof(gpr_atm)); gpr_atm old_val; old_val = gpr_atm_no_barrier_fetch_add(&stack->pushed[pushed_index], - -(gpr_atm)(1UL << pushed_bit)); - GPR_ASSERT((old_val & (1UL<<pushed_bit)) != 0); + -(gpr_atm)(1UL << pushed_bit)); + GPR_ASSERT((old_val & (1UL << pushed_bit)) != 0); } #endif diff --git a/src/core/support/string.c b/src/core/support/string.c index 9babbd910a..af0389ea83 100644 --- a/src/core/support/string.c +++ b/src/core/support/string.c @@ -125,7 +125,6 @@ char *gpr_dump_slice(gpr_slice s, gpr_uint32 flags) { flags); } - int gpr_parse_bytes_to_uint32(const char *buf, size_t len, gpr_uint32 *result) { gpr_uint32 out = 0; gpr_uint32 new; @@ -187,9 +186,9 @@ char *gpr_strjoin_sep(const char **strs, size_t nstrs, const char *sep, for (i = 0; i < nstrs; i++) { out_length += strlen(strs[i]); } - out_length += 1; /* null terminator */ + out_length += 1; /* null terminator */ if (nstrs > 0) { - out_length += sep_len * (nstrs - 1); /* separators */ + out_length += sep_len * (nstrs - 1); /* separators */ } out = gpr_malloc(out_length); out_length = 0; @@ -214,10 +213,8 @@ char *gpr_strjoin_sep(const char **strs, size_t nstrs, const char *sep, * str. * * Returns 1 and updates \a begin and \a end. Returns 0 otherwise. */ -static int slice_find_separator_offset(const gpr_slice str, - const char *sep, - const size_t read_offset, - size_t *begin, +static int slice_find_separator_offset(const gpr_slice str, const char *sep, + const size_t read_offset, size_t *begin, size_t *end) { size_t i; const gpr_uint8 *str_ptr = GPR_SLICE_START_PTR(str) + read_offset; @@ -255,9 +252,7 @@ void gpr_slice_split(gpr_slice str, const char *sep, gpr_slice_buffer *dst) { } } -void gpr_strvec_init(gpr_strvec *sv) { - memset(sv, 0, sizeof(*sv)); -} +void gpr_strvec_init(gpr_strvec *sv) { memset(sv, 0, sizeof(*sv)); } void gpr_strvec_destroy(gpr_strvec *sv) { size_t i; @@ -270,11 +265,11 @@ void gpr_strvec_destroy(gpr_strvec *sv) { void gpr_strvec_add(gpr_strvec *sv, char *str) { if (sv->count == sv->capacity) { sv->capacity = GPR_MAX(sv->capacity + 8, sv->capacity * 2); - sv->strs = gpr_realloc(sv->strs, sizeof(char*) * sv->capacity); + sv->strs = gpr_realloc(sv->strs, sizeof(char *) * sv->capacity); } sv->strs[sv->count++] = str; } char *gpr_strvec_flatten(gpr_strvec *sv, size_t *final_length) { - return gpr_strjoin((const char**)sv->strs, sv->count, final_length); + return gpr_strjoin((const char **)sv->strs, sv->count, final_length); } diff --git a/src/core/support/string.h b/src/core/support/string.h index 3ac4abeef8..a28e00fd3e 100644 --- a/src/core/support/string.h +++ b/src/core/support/string.h @@ -47,7 +47,7 @@ extern "C" { /* String utility functions */ /* Flags for gpr_dump function. */ -#define GPR_DUMP_HEX 0x00000001 +#define GPR_DUMP_HEX 0x00000001 #define GPR_DUMP_ASCII 0x00000002 /* Converts array buf, of length len, into a C string according to the flags. @@ -108,4 +108,4 @@ char *gpr_strvec_flatten(gpr_strvec *strs, size_t *total_length); } #endif -#endif /* GRPC_INTERNAL_CORE_SUPPORT_STRING_H */ +#endif /* GRPC_INTERNAL_CORE_SUPPORT_STRING_H */ diff --git a/src/core/support/string_win32.c b/src/core/support/string_win32.c index 27b9f3637a..8ffb0a225e 100644 --- a/src/core/support/string_win32.c +++ b/src/core/support/string_win32.c @@ -99,13 +99,9 @@ LPSTR gpr_tchar_to_char(LPCTSTR input) { return ret; } #else -char *gpr_tchar_to_char(LPTSTR input) { - return gpr_strdup(input); -} +char *gpr_tchar_to_char(LPTSTR input) { return gpr_strdup(input); } -char *gpr_char_to_tchar(LPTSTR input) { - return gpr_strdup(input); -} +char *gpr_char_to_tchar(LPTSTR input) { return gpr_strdup(input); } #endif #endif /* GPR_WIN32 */ diff --git a/src/core/support/string_win32.h b/src/core/support/string_win32.h index 1260aa55c1..e3043656fb 100644 --- a/src/core/support/string_win32.h +++ b/src/core/support/string_win32.h @@ -42,6 +42,6 @@ LPTSTR gpr_char_to_tchar(LPCSTR input); LPSTR gpr_tchar_to_char(LPCTSTR input); -#endif /* GPR_WIN32 */ +#endif /* GPR_WIN32 */ -#endif /* GRPC_INTERNAL_CORE_SUPPORT_STRING_WIN32_H */ +#endif /* GRPC_INTERNAL_CORE_SUPPORT_STRING_WIN32_H */ diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c index 61572b9a8e..6f078cd4bb 100644 --- a/src/core/support/sync_posix.c +++ b/src/core/support/sync_posix.c @@ -63,7 +63,8 @@ void gpr_cv_destroy(gpr_cv *cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); } int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { int err = 0; - if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == 0) { + if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == + 0) { err = pthread_cond_wait(cv, mu); } else { struct timespec abs_deadline_ts; diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c index 54f84a46ac..df23492171 100644 --- a/src/core/support/sync_win32.c +++ b/src/core/support/sync_win32.c @@ -83,7 +83,8 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { int timeout = 0; DWORD timeout_max_ms; mu->locked = 0; - if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == 0) { + if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == + 0) { SleepConditionVariableCS(cv, &mu->cs, INFINITE); } else { gpr_timespec now = gpr_now(abs_deadline.clock_type); diff --git a/src/core/support/thd.c b/src/core/support/thd.c index ec308f3119..32c0db5b66 100644 --- a/src/core/support/thd.c +++ b/src/core/support/thd.c @@ -37,9 +37,7 @@ #include <grpc/support/thd.h> -enum { - GPR_THD_JOINABLE = 1 -}; +enum { GPR_THD_JOINABLE = 1 }; gpr_thd_options gpr_thd_options_default(void) { gpr_thd_options options; diff --git a/src/core/support/thd_internal.h b/src/core/support/thd_internal.h index 4683c37742..1508c4691f 100644 --- a/src/core/support/thd_internal.h +++ b/src/core/support/thd_internal.h @@ -36,4 +36,4 @@ /* Internal interfaces between modules within the gpr support library. */ -#endif /* GRPC_INTERNAL_CORE_SUPPORT_THD_INTERNAL_H */ +#endif /* GRPC_INTERNAL_CORE_SUPPORT_THD_INTERNAL_H */ diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c index fa4eb50556..c36d94d044 100644 --- a/src/core/support/thd_posix.c +++ b/src/core/support/thd_posix.c @@ -69,9 +69,11 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, GPR_ASSERT(pthread_attr_init(&attr) == 0); if (gpr_thd_options_is_detached(options)) { - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0); + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == + 0); } else { - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0); + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == + 0); } thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0); GPR_ASSERT(pthread_attr_destroy(&attr) == 0); @@ -82,12 +84,8 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, return thread_started; } -gpr_thd_id gpr_thd_currentid(void) { - return (gpr_thd_id)pthread_self(); -} +gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); } -void gpr_thd_join(gpr_thd_id t) { - pthread_join((pthread_t)t, NULL); -} +void gpr_thd_join(gpr_thd_id t) { pthread_join((pthread_t)t, NULL); } #endif /* GPR_POSIX_SYNC */ diff --git a/src/core/support/thd_win32.c b/src/core/support/thd_win32.c index 4fa3907444..a9db180c1b 100644 --- a/src/core/support/thd_win32.c +++ b/src/core/support/thd_win32.c @@ -105,9 +105,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, return handle != NULL; } -gpr_thd_id gpr_thd_currentid(void) { - return (gpr_thd_id)g_thd_info; -} +gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; } void gpr_thd_join(gpr_thd_id t) { struct thd_info *info = (struct thd_info *)t; diff --git a/src/core/support/time.c b/src/core/support/time.c index b523ae01cc..929adac918 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -315,5 +315,6 @@ gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type clock_type) { return gpr_time_add(gpr_now(clock_type), t); } - return gpr_time_add(gpr_now(clock_type), gpr_time_sub(t, gpr_now(t.clock_type))); + return gpr_time_add(gpr_now(clock_type), + gpr_time_sub(t, gpr_now(t.clock_type))); } diff --git a/src/core/support/tls_pthread.c b/src/core/support/tls_pthread.c index f2e76a553f..2d28226fc4 100644 --- a/src/core/support/tls_pthread.c +++ b/src/core/support/tls_pthread.c @@ -38,7 +38,7 @@ #include <grpc/support/tls.h> gpr_intptr gpr_tls_set(struct gpr_pthread_thread_local *tls, gpr_intptr value) { - GPR_ASSERT(0 == pthread_setspecific(tls->key, (void*)value)); + GPR_ASSERT(0 == pthread_setspecific(tls->key, (void *)value)); return value; } diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index f01958984f..2c3b22d24e 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -59,4 +59,4 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q); void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q); -#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 6a1a6cbf30..33f277da46 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpc/support/useful.h> #include "src/core/channel/channel_stack.h" #include "src/core/iomgr/alarm.h" @@ -242,6 +243,9 @@ struct grpc_call { /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; + /* Supported encodings (compression algorithms), a bitset */ + gpr_uint32 encodings_accepted_by_peer; + /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -272,7 +276,8 @@ struct grpc_call { /** completion events - for completion queue use */ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; - /** siblings: children of the same parent form a list, and this list is protected under + /** siblings: children of the same parent form a list, and this list is + protected under parent->mu */ grpc_call *sibling_next; grpc_call *sibling_prev; @@ -394,7 +399,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, } else { call->sibling_next = parent_call->first_child; call->sibling_prev = parent_call->first_child->sibling_prev; - call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call; + call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = + call; } gpr_mu_unlock(&parent_call->mu); @@ -532,6 +538,45 @@ grpc_compression_algorithm grpc_call_get_compression_algorithm( return call->compression_algorithm; } +static void set_encodings_accepted_by_peer( + grpc_call *call, const gpr_slice accept_encoding_slice) { + size_t i; + grpc_compression_algorithm algorithm; + gpr_slice_buffer accept_encoding_parts; + + gpr_slice_buffer_init(&accept_encoding_parts); + gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts); + + /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already + * zeroes the whole grpc_call */ + /* Always support no compression */ + GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); + for (i = 0; i < accept_encoding_parts.count; i++) { + const gpr_slice *accept_encoding_entry_slice = + &accept_encoding_parts.slices[i]; + if (grpc_compression_algorithm_parse( + (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice), + GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) { + GPR_BITSET(&call->encodings_accepted_by_peer, algorithm); + } else { + char *accept_encoding_entry_str = + gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII); + gpr_log(GPR_ERROR, + "Invalid entry in accept encoding metadata: '%s'. Ignoring.", + accept_encoding_entry_str); + gpr_free(accept_encoding_entry_str); + } + } +} + +gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call) { + return call->encodings_accepted_by_peer; +} + +gpr_uint32 grpc_call_get_message_flags(const grpc_call *call) { + return call->incoming_message_flags; +} + static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { @@ -1280,7 +1325,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, const char *description, void *reserved) { grpc_call_error r; - (void) reserved; + (void)reserved; lock(c); r = cancel_with_status(c, status, description); unlock(c); @@ -1408,10 +1453,11 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); if (user_data) { algorithm = - ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; + ((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET; } else { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) { + if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), + &algorithm)) { gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); assert(0); } @@ -1440,6 +1486,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_compression_algorithm_string(call->channel)) { set_compression_algorithm(call, decode_compression(md)); + } else if (key == grpc_channel_get_encodings_accepted_by_peer_string( + call->channel)) { + set_encodings_accepted_by_peer(call, md->value->slice); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { @@ -1524,7 +1573,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, const grpc_op *op; grpc_ioreq *req; void (*finish_func)(grpc_call *, int, void *) = finish_batch; - GPR_ASSERT(!reserved); + + if (reserved != NULL) return GRPC_CALL_ERROR; GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); @@ -1539,12 +1589,13 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, /* rewrite batch ops into ioreq ops */ for (in = 0, out = 0; in < nops; in++) { op = &ops[in]; + if (op->reserved != NULL) return GRPC_CALL_ERROR; switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: /* Flag validation: currently allow no flags */ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; req->data.send_metadata.count = op->data.send_initial_metadata.count; req->data.send_metadata.metadata = @@ -1559,7 +1610,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_INVALID_MESSAGE; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; req->flags = op->flags; @@ -1571,7 +1622,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_CLOSE; req->flags = op->flags; break; @@ -1582,7 +1633,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_NOT_ON_CLIENT; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; req->flags = op->flags; req->data.send_metadata.count = @@ -1590,7 +1641,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->data.send_metadata.metadata = op->data.send_status_from_server.trailing_metadata; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_STATUS; req->data.send_status.code = op->data.send_status_from_server.status; req->data.send_status.details = @@ -1600,7 +1651,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.send_status_from_server.status_details, 0) : NULL; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_CLOSE; break; case GRPC_OP_RECV_INITIAL_METADATA: @@ -1610,7 +1661,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; req->data.recv_metadata->count = 0; @@ -1620,7 +1671,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, /* Flag validation: currently allow no flags */ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_MESSAGE; req->data.recv_message = op->data.recv_message; req->flags = op->flags; @@ -1632,26 +1683,26 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_STATUS; req->flags = op->flags; req->data.recv_status.set_value = set_status_value_directly; req->data.recv_status.user_data = op->data.recv_status_on_client.status; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_STATUS_DETAILS; req->data.recv_status_details.details = op->data.recv_status_on_client.status_details; req->data.recv_status_details.details_capacity = op->data.recv_status_on_client.status_details_capacity; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; req->data.recv_metadata = op->data.recv_status_on_client.trailing_metadata; req->data.recv_metadata->count = 0; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_CLOSE; finish_func = finish_batch_with_close; break; @@ -1659,14 +1710,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, /* Flag validation: currently allow no flags */ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_STATUS; req->flags = op->flags; req->data.recv_status.set_value = set_cancelled_value; req->data.recv_status.user_data = op->data.recv_close_on_server.cancelled; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_CLOSE; finish_func = finish_batch_with_close; break; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 75bdbce980..00638e43b5 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -38,6 +38,10 @@ #include "src/core/channel/context.h" #include <grpc/grpc.h> +#ifdef __cplusplus +extern "C" { +#endif + /* Primitive operation types - grpc_op's get rewritten into these */ typedef enum { GRPC_IOREQ_RECV_INITIAL_METADATA, @@ -162,4 +166,19 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem); gpr_uint8 grpc_call_is_client(grpc_call *call); +grpc_compression_algorithm grpc_call_get_compression_algorithm( + const grpc_call *call); + +gpr_uint32 grpc_call_get_message_flags(const grpc_call *call); + +/** Returns a bitset for the encodings (compression algorithms) supported by \a + * call's peer. + * + * To be indexed by grpc_compression_algorithm enum values. */ +gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call); + +#ifdef __cplusplus +} +#endif + #endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c index 7bf8cafc24..5a3ef1e5f4 100644 --- a/src/core/surface/call_log_batch.c +++ b/src/core/surface/call_log_batch.c @@ -41,7 +41,7 @@ int grpc_trace_batch = 0; static void add_metadata(gpr_strvec *b, const grpc_metadata *md, size_t count) { size_t i; - for(i = 0; i < count; i++) { + for (i = 0; i < count; i++) { gpr_strvec_add(b, gpr_strdup("\nkey=")); gpr_strvec_add(b, gpr_strdup(md[i].key)); @@ -113,8 +113,9 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, char *tmp; size_t i; gpr_log(file, line, severity, - "grpc_call_start_batch(call=%p, ops=%p, nops=%d, tag=%p)", call, ops, nops, tag); - for(i = 0; i < nops; i++) { + "grpc_call_start_batch(call=%p, ops=%p, nops=%d, tag=%p)", call, ops, + nops, tag); + for (i = 0; i < nops; i++) { tmp = grpc_op_string(&ops[i]); gpr_log(file, line, severity, "ops[%d]: %s", i, tmp); gpr_free(tmp); @@ -123,8 +124,7 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, void grpc_server_log_request_call(char *file, int line, gpr_log_severity severity, - grpc_server *server, - grpc_call **call, + grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, grpc_completion_queue *cq_bound_to_call, @@ -133,8 +133,9 @@ void grpc_server_log_request_call(char *file, int line, gpr_log(file, line, severity, "grpc_server_request_call(server=%p, call=%p, details=%p, " "initial_metadata=%p, cq_bound_to_call=%p, cq_for_notification=%p, " - "tag=%p)", server, call, details, initial_metadata, - cq_bound_to_call, cq_for_notification, tag); + "tag=%p)", + server, call, details, initial_metadata, cq_bound_to_call, + cq_for_notification, tag); } void grpc_server_log_shutdown(char *file, int line, gpr_log_severity severity, diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 308572c634..e50251566d 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -66,6 +66,7 @@ struct grpc_channel { /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_compression_algorithm_string; + grpc_mdstr *grpc_encodings_accepted_by_peer_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; @@ -104,7 +105,10 @@ grpc_channel *grpc_channel_create_from_filters( channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0); channel->grpc_compression_algorithm_string = grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); - channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message", 0); + channel->grpc_encodings_accepted_by_peer_string = + grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0); + channel->grpc_message_string = + grpc_mdstr_from_string(mdctx, "grpc-message", 0); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(i, buf); @@ -159,7 +163,7 @@ static grpc_call *grpc_channel_create_call_internal( send_metadata[num_metadata++] = authority_mdelem; } - return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL, + return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL, send_metadata, num_metadata, deadline); } @@ -175,10 +179,11 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method, 0)), - host ? - grpc_mdelem_from_metadata_strings( - channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string), - grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL, + host ? grpc_mdelem_from_metadata_strings( + channel->metadata_context, + GRPC_MDSTR_REF(channel->authority_string), + grpc_mdstr_from_string(channel->metadata_context, host, 0)) + : NULL, deadline); } @@ -189,9 +194,12 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, rc->path = grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method, 0)); - rc->authority = host ? grpc_mdelem_from_metadata_strings( - channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string), - grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL; + rc->authority = + host ? grpc_mdelem_from_metadata_strings( + channel->metadata_context, + GRPC_MDSTR_REF(channel->authority_string), + grpc_mdstr_from_string(channel->metadata_context, host, 0)) + : NULL; gpr_mu_lock(&channel->registered_call_mu); rc->next = channel->registered_calls; channel->registered_calls = rc; @@ -206,8 +214,8 @@ grpc_call *grpc_channel_create_registered_call( registered_call *rc = registered_call_handle; GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( - channel, parent_call, propagation_mask, completion_queue, - GRPC_MDELEM_REF(rc->path), + channel, parent_call, propagation_mask, completion_queue, + GRPC_MDELEM_REF(rc->path), rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline); } @@ -230,6 +238,7 @@ static void destroy_channel(void *p, int ok) { } GRPC_MDSTR_UNREF(channel->grpc_status_string); GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string); + GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string); GRPC_MDSTR_UNREF(channel->grpc_message_string); GRPC_MDSTR_UNREF(channel->path_string); GRPC_MDSTR_UNREF(channel->authority_string); @@ -290,6 +299,11 @@ grpc_mdstr *grpc_channel_get_compression_algorithm_string( return channel->grpc_compression_algorithm_string; } +grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( + grpc_channel *channel) { + return channel->grpc_encodings_accepted_by_peer_string; +} + grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { return GRPC_MDELEM_REF(channel->grpc_status_elem[i]); diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 9e0646efaa..f271616f60 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -56,6 +56,8 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_compression_algorithm_string( grpc_channel *channel); +grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( + grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 1223706457..88a7c16598 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -77,9 +77,10 @@ typedef struct { } state_watcher; static void delete_state_watcher(state_watcher *w) { - grpc_channel_element *client_channel_elem = - grpc_channel_stack_last_element(grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_del_interested_party(client_channel_elem, grpc_cq_pollset(w->cq)); + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_del_interested_party(client_channel_elem, + grpc_cq_pollset(w->cq)); GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity"); gpr_mu_destroy(&w->mu); gpr_free(w); @@ -166,9 +167,9 @@ void grpc_channel_watch_connectivity_state( w->tag = tag; w->channel = channel; - grpc_alarm_init( - &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_alarm_init(&w->alarm, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); if (client_channel_elem->filter != &grpc_client_channel_filter) { gpr_log(GPR_ERROR, @@ -178,7 +179,8 @@ void grpc_channel_watch_connectivity_state( grpc_iomgr_add_delayed_callback(&w->on_complete, 1); } else { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); - grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq)); + grpc_client_channel_add_interested_party(client_channel_elem, + grpc_cq_pollset(cq)); grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, &w->on_complete); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 378b3f71a1..b58115a93f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -167,10 +167,12 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, } grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, - gpr_timespec deadline, - void *reserved) { + gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_pollset_worker worker; + int first_loop = 1; + gpr_timespec now; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -197,12 +199,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { + now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; } + first_loop = 0; + grpc_pollset_work(&cc->pollset, &worker, now, deadline); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); @@ -240,6 +245,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker worker; + gpr_timespec now; + int first_loop = 1; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -272,8 +280,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, break; } if (!add_plucker(cc, tag, &worker)) { - gpr_log(GPR_DEBUG, - "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d", + gpr_log(GPR_DEBUG, + "Too many outstanding grpc_completion_queue_pluck calls: maximum " + "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); @@ -281,13 +290,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret.type = GRPC_QUEUE_TIMEOUT; break; } - if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { + now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; } + first_loop = 0; + grpc_pollset_work(&cc->pollset, &worker, now, deadline); del_plucker(cc, tag, &worker); } done: diff --git a/src/core/surface/event_string.h b/src/core/surface/event_string.h index e8a8f93518..07c474e3a0 100644 --- a/src/core/surface/event_string.h +++ b/src/core/surface/event_string.h @@ -39,4 +39,4 @@ /* Returns a string describing an event. Must be later freed with gpr_free() */ char *grpc_event_string(grpc_event *ev); -#endif /* GRPC_INTERNAL_CORE_SURFACE_EVENT_STRING_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_EVENT_STRING_H */ diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 442bc72f21..d9044549f2 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -33,8 +33,11 @@ #include <grpc/support/port_platform.h> +#include <memory.h> + #include <grpc/census.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/time.h> #include "src/core/channel/channel_stack.h" #include "src/core/client_config/resolver_registry.h" @@ -49,6 +52,8 @@ #include "src/core/transport/chttp2_transport.h" #include "src/core/transport/connectivity_state.h" +#define MAX_PLUGINS 128 + static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; static int g_initializations; @@ -58,7 +63,23 @@ static void do_basic_init(void) { g_initializations = 0; } +typedef struct grpc_plugin { + void (*init)(); + void (*destroy)(); +} grpc_plugin; + +static grpc_plugin g_all_of_the_plugins[MAX_PLUGINS]; +static int g_number_of_plugins = 0; + +void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) { + GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS); + g_all_of_the_plugins[g_number_of_plugins].init = init; + g_all_of_the_plugins[g_number_of_plugins].destroy = destroy; + g_number_of_plugins++; +} + void grpc_init(void) { + int i; gpr_once_init(&g_basic_init, do_basic_init); gpr_mu_lock(&g_init_mu); @@ -87,11 +108,17 @@ void grpc_init(void) { } } grpc_timers_global_init(); + for (i = 0; i < g_number_of_plugins; i++) { + if (g_all_of_the_plugins[i].init != NULL) { + g_all_of_the_plugins[i].init(); + } + } } gpr_mu_unlock(&g_init_mu); } void grpc_shutdown(void) { + int i; gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { grpc_iomgr_shutdown(); @@ -99,6 +126,11 @@ void grpc_shutdown(void) { grpc_timers_global_destroy(); grpc_tracer_shutdown(); grpc_resolver_registry_shutdown(); + for (i = 0; i < g_number_of_plugins; i++) { + if (g_all_of_the_plugins[i].destroy != NULL) { + g_all_of_the_plugins[i].destroy(); + } + } } gpr_mu_unlock(&g_init_mu); } diff --git a/src/core/surface/init.h b/src/core/surface/init.h index 416874020d..771c30f412 100644 --- a/src/core/surface/init.h +++ b/src/core/surface/init.h @@ -37,4 +37,4 @@ void grpc_security_pre_init(void); int grpc_is_initialized(void); -#endif /* GRPC_INTERNAL_CORE_SURFACE_INIT_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_INIT_H */ diff --git a/src/core/surface/init_unsecure.c b/src/core/surface/init_unsecure.c index ddb70cef8e..630d564a7d 100644 --- a/src/core/surface/init_unsecure.c +++ b/src/core/surface/init_unsecure.c @@ -33,5 +33,4 @@ #include "src/core/surface/init.h" -void grpc_security_pre_init(void) { -} +void grpc_security_pre_init(void) {} diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index c4215a2cfb..80704cbf67 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -50,6 +50,8 @@ typedef struct { typedef struct { grpc_mdctx *mdctx; grpc_channel *master; + grpc_status_code error_code; + const char *error_message; } channel_data; static void lame_start_transport_stream_op(grpc_call_element *elem, @@ -64,11 +66,11 @@ static void lame_start_transport_stream_op(grpc_call_element *elem, if (op->recv_ops != NULL) { char tmp[GPR_LTOA_MIN_BUFSIZE]; grpc_metadata_batch mdb; - gpr_ltoa(GRPC_STATUS_UNKNOWN, tmp); + gpr_ltoa(chand->error_code, tmp); calld->status.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp); calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", - "Rpc sent on a lame channel."); + chand->error_message); calld->status.prev = calld->details.next = NULL; calld->status.next = &calld->details; calld->details.prev = &calld->status; @@ -138,8 +140,21 @@ static const grpc_channel_filter lame_filter = { "lame-client", }; -grpc_channel *grpc_lame_client_channel_create(const char *target) { +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) + +grpc_channel *grpc_lame_client_channel_create(const char *target, + grpc_status_code error_code, + const char *error_message) { + grpc_channel *channel; + grpc_channel_element *elem; + channel_data *chand; static const grpc_channel_filter *filters[] = {&lame_filter}; - return grpc_channel_create_from_filters(target, filters, 1, NULL, - grpc_mdctx_create(), 1); + channel = grpc_channel_create_from_filters(target, filters, 1, NULL, + grpc_mdctx_create(), 1); + elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); + GPR_ASSERT(elem->filter == &lame_filter); + chand = (channel_data *)elem->channel_data; + chand->error_code = error_code; + chand->error_message = error_message; + return channel; } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index c3150250b8..5b03ba95a7 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -199,13 +199,17 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, if (grpc_find_security_connector_in_args(args) != NULL) { gpr_log(GPR_ERROR, "Cannot set security context in channel args."); - return grpc_lame_client_channel_create(target); + return grpc_lame_client_channel_create( + target, GRPC_STATUS_INVALID_ARGUMENT, + "Security connector exists in channel args."); } if (grpc_credentials_create_security_connector( creds, target, args, NULL, &connector, &new_args_from_connector) != GRPC_SECURITY_OK) { - return grpc_lame_client_channel_create(target); + return grpc_lame_client_channel_create( + target, GRPC_STATUS_INVALID_ARGUMENT, + "Failed to create security connector."); } mdctx = grpc_mdctx_create(); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index f883275951..1c402418e8 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -712,7 +712,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, chand->server = NULL; chand->channel = NULL; chand->path_key = grpc_mdstr_from_string(metadata_context, ":path", 0); - chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority", 0); + chand->authority_key = + grpc_mdstr_from_string(metadata_context, ":authority", 0); chand->next = chand->prev = chand; chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; @@ -974,6 +975,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } +void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) { + (void) done_arg; + gpr_free(storage); +} + void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; @@ -985,6 +991,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq); + if (server->shutdown_published) { + grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL, + gpr_malloc(sizeof(grpc_cq_completion))); + gpr_mu_unlock(&server->mu_global); + return; + } server->shutdown_tags = gpr_realloc(server->shutdown_tags, sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)); @@ -1134,6 +1146,7 @@ grpc_call_error grpc_server_request_call( return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } grpc_cq_begin_op(cq_for_notification); + details->reserved = NULL; rc->type = BATCH_CALL; rc->server = server; rc->tag = tag; diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index 9237eb5a90..fc7ae820f5 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -38,7 +38,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { const grpc_channel_filter *filters[] = {&grpc_compress_filter}; - (void) reserved; + (void)reserved; return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters), args); } diff --git a/src/core/surface/surface_trace.h b/src/core/surface/surface_trace.h index 01302bb5d4..2b4728e2b4 100644 --- a/src/core/surface/surface_trace.h +++ b/src/core/surface/surface_trace.h @@ -40,10 +40,10 @@ extern int grpc_surface_trace; #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ - if (grpc_surface_trace) { \ + if (grpc_surface_trace) { \ char *_ev = grpc_event_string(event); \ gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \ gpr_free(_ev); \ } -#endif /* GRPC_INTERNAL_CORE_SURFACE_SURFACE_TRACE_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_SURFACE_TRACE_H */ diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 40bf2ebd79..474c3d5ee6 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -92,10 +92,10 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->frame_type = *cur; switch (p->frame_type) { case 0: - p->is_frame_compressed = 0; /* GPR_FALSE */ + p->is_frame_compressed = 0; /* GPR_FALSE */ break; case 1: - p->is_frame_compressed = 1; /* GPR_TRUE */ + p->is_frame_compressed = 1; /* GPR_TRUE */ break; default: gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index d84960009b..dc5eb18e42 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -177,10 +177,9 @@ void grpc_chttp2_publish_reads( "parsed", transport_parsing, stream_global, max_recv_bytes, -(gpr_int64)stream_parsing->incoming_window_delta); stream_global->incoming_window -= stream_parsing->incoming_window_delta; - GPR_ASSERT(stream_global->max_recv_bytes >= - stream_parsing->incoming_window_delta); - stream_global->max_recv_bytes -= - stream_parsing->incoming_window_delta; + GPR_ASSERT(stream_global->max_recv_bytes >= + stream_parsing->incoming_window_delta); + stream_global->max_recv_bytes -= stream_parsing->incoming_window_delta; stream_parsing->incoming_window_delta = 0; grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 0f04169741..1ea697f71e 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -66,6 +66,8 @@ typedef struct { size_t header_idx; /* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */ gpr_uint8 last_was_header; + /* have we seen a regular (non-colon-prefixed) header yet? */ + gpr_uint8 seen_regular_header; /* output stream id */ gpr_uint32 stream_id; gpr_slice_buffer *output; @@ -361,6 +363,15 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, gpr_uint32 indices_key; int should_add_elem; + GPR_ASSERT (GPR_SLICE_LENGTH(elem->key->slice) > 0); + if (GPR_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */ + st->seen_regular_header = 1; + } else if (st->seen_regular_header != 0) { /* reserved header */ + gpr_log(GPR_ERROR, + "Reserved header (colon-prefixed) happening after regular ones."); + abort(); + } + inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems); /* is this elem currently in the decoders table? */ @@ -566,6 +577,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, st.cur_frame_type = NONE; st.last_was_header = 0; + st.seen_regular_header = 0; st.stream_id = stream_id; st.output = output; diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 9c3ad7a777..38c6052f9c 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -363,7 +363,7 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t, } int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { + grpc_chttp2_stream *s) { stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); } diff --git a/src/core/transport/chttp2/stream_map.c b/src/core/transport/chttp2/stream_map.c index 0ec2f27291..bd16153ed1 100644 --- a/src/core/transport/chttp2/stream_map.c +++ b/src/core/transport/chttp2/stream_map.c @@ -123,8 +123,7 @@ void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, dst->values = gpr_realloc(dst->values, dst->capacity * sizeof(void *)); } memcpy(dst->keys + dst->count, src->keys, src->count * sizeof(gpr_uint32)); - memcpy(dst->values + dst->count, src->values, - src->count * sizeof(void*)); + memcpy(dst->values + dst->count, src->values, src->count * sizeof(void *)); dst->count += src->count; dst->free += src->free; src->count = 0; diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index b55e81fdca..123061b3fc 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -112,13 +112,18 @@ int grpc_chttp2_unlocking_check_writes( } } - if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { - stream_writing->announce_window = stream_global->unannounced_incoming_window; - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - incoming_window, stream_global->unannounced_incoming_window); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window); - stream_global->incoming_window += stream_global->unannounced_incoming_window; + if (!stream_global->read_closed && + stream_global->unannounced_incoming_window > 0) { + stream_writing->announce_window = + stream_global->unannounced_incoming_window; + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "write", transport_global, stream_global, incoming_window, + stream_global->unannounced_incoming_window); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "write", transport_global, stream_global, unannounced_incoming_window, + -(gpr_int64)stream_global->unannounced_incoming_window); + stream_global->incoming_window += + stream_global->unannounced_incoming_window; stream_global->unannounced_incoming_window = 0; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); @@ -179,18 +184,20 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { - if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + if (stream_writing->sopb.nops > 0 || + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, - stream_writing->id, &transport_writing->hpack_compressor, + stream_writing->id, + &transport_writing->hpack_compressor, &transport_writing->outbuf); stream_writing->sopb.nops = 0; } if (stream_writing->announce_window > 0) { gpr_slice_buffer_add( &transport_writing->outbuf, - grpc_chttp2_window_update_create( - stream_writing->id, stream_writing->announce_window)); + grpc_chttp2_window_update_create(stream_writing->id, + stream_writing->announce_window)); stream_writing->announce_window = 0; } if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a9f91b64d5..1bbd210e46 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -116,7 +116,7 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset); static void add_to_pollset_set_locked(grpc_chttp2_transport *t, - grpc_pollset_set *pollset_set); + grpc_pollset_set *pollset_set); /** Start new streams that have been created if we can */ static void maybe_start_some_streams( @@ -368,11 +368,10 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->global.outgoing_window = t->global.settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->global.max_recv_bytes = - s->parsing.incoming_window = + s->global.max_recv_bytes = s->parsing.incoming_window = s->global.incoming_window = - t->global.settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + t->global.settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); s->global.in_stream_map = 1; @@ -580,7 +579,7 @@ static void maybe_start_some_streams( stream_global->incoming_window = transport_global->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - stream_global->max_recv_bytes = + stream_global->max_recv_bytes = GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes); grpc_chttp2_stream_map_add( &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, @@ -590,7 +589,6 @@ static void maybe_start_some_streams( grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -648,12 +646,14 @@ static void perform_stream_op_locked( stream_global->publish_sopb->nops = 0; stream_global->publish_state = op->recv_state; if (stream_global->max_recv_bytes < op->max_recv_bytes) { - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global, - max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "op", transport_global, stream_global, max_recv_bytes, + op->max_recv_bytes - stream_global->max_recv_bytes); GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( "op", transport_global, stream_global, unannounced_incoming_window, op->max_recv_bytes - stream_global->max_recv_bytes); - stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes; + stream_global->unannounced_incoming_window += + op->max_recv_bytes - stream_global->max_recv_bytes; stream_global->max_recv_bytes = op->max_recv_bytes; } grpc_chttp2_incoming_metadata_live_op_buffer_end( @@ -1175,7 +1175,7 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t, } static void add_to_pollset_set_locked(grpc_chttp2_transport *t, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set) { if (t->ep) { grpc_endpoint_add_to_pollset_set(t->ep, pollset_set); } diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 44d32b6cb2..f92e87e9dd 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -133,8 +133,8 @@ static void unlock(grpc_mdctx *ctx) { case), since otherwise we can be stuck waiting for a garbage collection that will never happen. */ if (ctx->refs == 0) { - /* uncomment if you're having trouble diagnosing an mdelem leak to make - things clearer (slows down destruction a lot, however) */ +/* uncomment if you're having trouble diagnosing an mdelem leak to make + things clearer (slows down destruction a lot, however) */ #ifdef GRPC_METADATA_REFCOUNT_DEBUG gc_mdtab(ctx); #endif @@ -311,7 +311,8 @@ static void slice_unref(void *p) { unlock(ctx); } -grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str, int canonicalize_key) { +grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str, + int canonicalize_key) { if (canonicalize_key) { size_t len; size_t i; @@ -522,9 +523,9 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, grpc_mdelem *grpc_mdelem_from_strings(grpc_mdctx *ctx, const char *key, const char *value) { - return grpc_mdelem_from_metadata_strings(ctx, - grpc_mdstr_from_string(ctx, key, 0), - grpc_mdstr_from_string(ctx, value, 0)); + return grpc_mdelem_from_metadata_strings( + ctx, grpc_mdstr_from_string(ctx, key, 0), + grpc_mdstr_from_string(ctx, value, 0)); } grpc_mdelem *grpc_mdelem_from_slices(grpc_mdctx *ctx, gpr_slice key, diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h index 15ef9bb555..a7af49ba55 100644 --- a/src/core/transport/metadata.h +++ b/src/core/transport/metadata.h @@ -95,7 +95,8 @@ size_t grpc_mdctx_get_mdtab_free_test_only(grpc_mdctx *mdctx); /* Constructors for grpc_mdstr instances; take a variety of data types that clients may have handy */ -grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str, int perform_key_canonicalization); +grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str, + int perform_key_canonicalization); /* Unrefs the slice. */ grpc_mdstr *grpc_mdstr_from_slice(grpc_mdctx *ctx, gpr_slice slice); grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *str, @@ -179,4 +180,4 @@ void grpc_mdctx_unlock(grpc_mdctx *ctx); #define GRPC_MDSTR_KV_HASH(k_hash, v_hash) (GPR_ROTL((k_hash), 2) ^ (v_hash)) -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_METADATA_H */ +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_METADATA_H */ diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 0a9669b0ab..038586d48e 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -203,8 +203,8 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) { #endif /* NDEBUG */ void grpc_metadata_batch_init(grpc_metadata_batch *batch) { - batch->list.head = batch->list.tail = batch->garbage.head = batch->garbage.tail = - NULL; + batch->list.head = batch->list.tail = batch->garbage.head = + batch->garbage.tail = NULL; batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } @@ -288,7 +288,7 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target, } void grpc_metadata_batch_move(grpc_metadata_batch *dst, - grpc_metadata_batch *src) { + grpc_metadata_batch *src) { *dst = *src; memset(src, 0, sizeof(grpc_metadata_batch)); } diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index 9ce1ddb95e..29127c4269 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -121,7 +121,7 @@ static void store32_little_endian(gpr_uint32 value, unsigned char* buf) { buf[3] = (unsigned char)(value >> 24) & 0xFF; buf[2] = (unsigned char)(value >> 16) & 0xFF; buf[1] = (unsigned char)(value >> 8) & 0xFF; - buf[0] = (unsigned char)(value) & 0xFF; + buf[0] = (unsigned char)(value)&0xFF; } static void tsi_fake_frame_reset(tsi_fake_frame* frame, int needs_draining) { @@ -370,7 +370,8 @@ static void fake_protector_destroy(tsi_frame_protector* self) { static const tsi_frame_protector_vtable frame_protector_vtable = { fake_protector_protect, fake_protector_protect_flush, - fake_protector_unprotect, fake_protector_destroy, }; + fake_protector_unprotect, fake_protector_destroy, +}; /* --- tsi_handshaker methods implementation. ---*/ @@ -393,7 +394,8 @@ static tsi_result fake_handshaker_get_bytes_to_send_to_peer( next_message_to_send = TSI_FAKE_HANDSHAKE_MESSAGE_MAX; } if (tsi_tracing_enabled) { - gpr_log(GPR_INFO, "%s prepared %s.", impl->is_client ? "Client" : "Server", + gpr_log(GPR_INFO, "%s prepared %s.", + impl->is_client ? "Client" : "Server", tsi_fake_handshake_message_to_string(impl->next_message_to_send)); } impl->next_message_to_send = next_message_to_send; @@ -493,7 +495,8 @@ static const tsi_handshaker_vtable handshaker_vtable = { fake_handshaker_get_result, fake_handshaker_extract_peer, fake_handshaker_create_frame_protector, - fake_handshaker_destroy, }; + fake_handshaker_destroy, +}; tsi_handshaker* tsi_create_fake_handshaker(int is_client) { tsi_fake_handshaker* impl = calloc(1, sizeof(tsi_fake_handshaker)); diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h index af9730b90e..1fa11349fb 100644 --- a/src/core/tsi/fake_transport_security.h +++ b/src/core/tsi/fake_transport_security.h @@ -58,4 +58,4 @@ tsi_frame_protector* tsi_create_fake_protector( } #endif -#endif /* GRPC_INTERNAL_CORE_TSI_FAKE_TRANSPORT_SECURITY_H */ +#endif /* GRPC_INTERNAL_CORE_TSI_FAKE_TRANSPORT_SECURITY_H */ diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 609fc06ed5..0b416f6c9d 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -43,7 +43,7 @@ #include "src/core/tsi/transport_security.h" #include <openssl/bio.h> -#include <openssl/crypto.h> /* For OPENSSL_free */ +#include <openssl/crypto.h> /* For OPENSSL_free */ #include <openssl/err.h> #include <openssl/ssl.h> #include <openssl/x509.h> @@ -54,7 +54,6 @@ #define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_UPPER_BOUND 16384 #define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_LOWER_BOUND 1024 - /* Putting a macro like this and littering the source file with #if is really bad practice. TODO(jboeuf): refactor all the #if / #endif in a separate module. */ @@ -116,7 +115,7 @@ typedef struct { /* --- Library Initialization. ---*/ static gpr_once init_openssl_once = GPR_ONCE_INIT; -static gpr_mu *openssl_mutexes = NULL; +static gpr_mu* openssl_mutexes = NULL; static void openssl_locking_cb(int mode, int type, const char* file, int line) { if (mode & CRYPTO_LOCK) { @@ -195,7 +194,7 @@ static void ssl_info_callback(const SSL* ssl, int where, int ret) { /* Returns 1 if name looks like an IP address, 0 otherwise. This is a very rough heuristic as it does not handle IPV6 or things like: 0300.0250.00.01, 0xC0.0Xa8.0x0.0x1, 000030052000001, 0xc0.052000001 */ -static int looks_like_ip_address(const char *name) { +static int looks_like_ip_address(const char* name) { size_t i; size_t dot_count = 0; size_t num_size = 0; @@ -215,7 +214,6 @@ static int looks_like_ip_address(const char *name) { return 1; } - /* Gets the subject CN from an X509 cert. */ static tsi_result ssl_get_x509_common_name(X509* cert, unsigned char** utf8, size_t* utf8_size) { @@ -630,7 +628,8 @@ static tsi_result build_alpn_protocol_name_list( } /* Safety check. */ if ((current < *protocol_name_list) || - ((gpr_uintptr)(current - *protocol_name_list) != *protocol_name_list_length)) { + ((gpr_uintptr)(current - *protocol_name_list) != + *protocol_name_list_length)) { return TSI_INTERNAL_ERROR; } return TSI_OK; @@ -768,7 +767,8 @@ static void ssl_protector_destroy(tsi_frame_protector* self) { static const tsi_frame_protector_vtable frame_protector_vtable = { ssl_protector_protect, ssl_protector_protect_flush, ssl_protector_unprotect, - ssl_protector_destroy, }; + ssl_protector_destroy, +}; /* --- tsi_handshaker methods implementation. ---*/ @@ -948,7 +948,8 @@ static const tsi_handshaker_vtable handshaker_vtable = { ssl_handshaker_get_result, ssl_handshaker_extract_peer, ssl_handshaker_create_frame_protector, - ssl_handshaker_destroy, }; + ssl_handshaker_destroy, +}; /* --- tsi_ssl_handshaker_factory common methods. --- */ @@ -1075,9 +1076,11 @@ static void ssl_client_handshaker_factory_destroy( free(impl); } -static int client_handshaker_factory_npn_callback( - SSL* ssl, unsigned char** out, unsigned char* outlen, - const unsigned char* in, unsigned int inlen, void* arg) { +static int client_handshaker_factory_npn_callback(SSL* ssl, unsigned char** out, + unsigned char* outlen, + const unsigned char* in, + unsigned int inlen, + void* arg) { tsi_ssl_client_handshaker_factory* factory = (tsi_ssl_client_handshaker_factory*)arg; return select_protocol_list((const unsigned char**)out, outlen, @@ -1121,7 +1124,7 @@ static void ssl_server_handshaker_factory_destroy( static int does_entry_match_name(const char* entry, size_t entry_length, const char* name) { - const char *dot; + const char* dot; const char* name_subdomain = NULL; size_t name_length = strlen(name); size_t name_subdomain_length; @@ -1153,7 +1156,7 @@ static int does_entry_match_name(const char* entry, size_t entry_length, if (name_subdomain_length < 2) return 0; name_subdomain++; /* Starts after the dot. */ name_subdomain_length--; - entry += 2; /* Remove *. */ + entry += 2; /* Remove *. */ entry_length -= 2; dot = strchr(name_subdomain, '.'); if ((dot == NULL) || (dot == &name_subdomain[name_subdomain_length - 1])) { @@ -1170,7 +1173,7 @@ static int does_entry_match_name(const char* entry, size_t entry_length, static int ssl_server_handshaker_factory_servername_callback(SSL* ssl, int* ap, void* arg) { tsi_ssl_server_handshaker_factory* impl = - (tsi_ssl_server_handshaker_factory*)arg; + (tsi_ssl_server_handshaker_factory*)arg; size_t i = 0; const char* servername = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name); if (servername == NULL || strlen(servername) == 0) { diff --git a/src/core/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h index 4bf6c81b75..cdf4f294be 100644 --- a/src/core/tsi/ssl_transport_security.h +++ b/src/core/tsi/ssl_transport_security.h @@ -170,4 +170,4 @@ int tsi_ssl_peer_matches_name(const tsi_peer* peer, const char* name); } #endif -#endif /* GRPC_INTERNAL_CORE_TSI_SSL_TRANSPORT_SECURITY_H */ +#endif /* GRPC_INTERNAL_CORE_TSI_SSL_TRANSPORT_SECURITY_H */ diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h index 4cd0ec2cfb..34283f2f9c 100644 --- a/src/core/tsi/transport_security.h +++ b/src/core/tsi/transport_security.h @@ -108,4 +108,4 @@ char* tsi_strdup(const char* src); /* Sadly, no strdup in C89. */ } #endif -#endif /* GRPC_INTERNAL_CORE_TSI_TRANSPORT_SECURITY_H */ +#endif /* GRPC_INTERNAL_CORE_TSI_TRANSPORT_SECURITY_H */ diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h index e27e6b9fc9..03a51683a2 100644 --- a/src/core/tsi/transport_security_interface.h +++ b/src/core/tsi/transport_security_interface.h @@ -341,4 +341,4 @@ void tsi_handshaker_destroy(tsi_handshaker* self); } #endif -#endif /* GRPC_INTERNAL_CORE_TSI_TRANSPORT_SECURITY_INTERFACE_H */ +#endif /* GRPC_INTERNAL_CORE_TSI_TRANSPORT_SECURITY_INTERFACE_H */ |