aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-08-26 11:19:10 -0700
committerGravatar Julien Boeuf <jboeuf@google.com>2015-08-26 11:19:10 -0700
commit5b3516e2c5c3c6878d28cd19db73d5572851e6b9 (patch)
treea34bde8ab374f35f5a6a6a4cdbdf741076dbd3fa /src/core
parent9a8d0d3eaf8ccd1c3a6e27f1f8bbeeee85e64345 (diff)
parent1f1919c8cfc61cd9af288cd1585b38b9c3511e1f (diff)
Merge branch 'master' of github.com:grpc/grpc into cpp_auth_md_processor
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/census_filter.h2
-rw-r--r--src/core/channel/channel_args.c63
-rw-r--r--src/core/channel/channel_args.h20
-rw-r--r--src/core/channel/client_channel.c38
-rw-r--r--src/core/channel/client_channel.h7
-rw-r--r--src/core/channel/compress_filter.c57
-rw-r--r--src/core/channel/compress_filter.h2
-rw-r--r--src/core/channel/http_client_filter.h2
-rw-r--r--src/core/channel/http_server_filter.h2
-rw-r--r--src/core/channel/noop_filter.h2
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c3
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c59
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c501
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.h42
-rw-r--r--src/core/client_config/subchannel.h2
-rw-r--r--src/core/client_config/subchannel_factory_decorators/add_channel_arg.c10
-rw-r--r--src/core/client_config/subchannel_factory_decorators/add_channel_arg.h5
-rw-r--r--src/core/client_config/subchannel_factory_decorators/merge_channel_args.c4
-rw-r--r--src/core/client_config/subchannel_factory_decorators/merge_channel_args.h5
-rw-r--r--src/core/compression/algorithm.c15
-rw-r--r--src/core/debug/trace.c8
-rw-r--r--src/core/debug/trace.h2
-rw-r--r--src/core/httpcli/format_request.c6
-rw-r--r--src/core/httpcli/format_request.h2
-rw-r--r--src/core/httpcli/parser.h2
-rw-r--r--src/core/iomgr/alarm.c5
-rw-r--r--src/core/iomgr/alarm.h2
-rw-r--r--src/core/iomgr/alarm_heap.c10
-rw-r--r--src/core/iomgr/alarm_heap.h2
-rw-r--r--src/core/iomgr/alarm_internal.h2
-rw-r--r--src/core/iomgr/endpoint.c3
-rw-r--r--src/core/iomgr/endpoint.h5
-rw-r--r--src/core/iomgr/endpoint_pair.h2
-rw-r--r--src/core/iomgr/endpoint_pair_windows.c20
-rw-r--r--src/core/iomgr/iocp_windows.c35
-rw-r--r--src/core/iomgr/iocp_windows.h10
-rw-r--r--src/core/iomgr/iomgr.h2
-rw-r--r--src/core/iomgr/iomgr_internal.h2
-rw-r--r--src/core/iomgr/iomgr_posix.c2
-rw-r--r--src/core/iomgr/iomgr_posix.h2
-rw-r--r--src/core/iomgr/iomgr_windows.c2
-rw-r--r--src/core/iomgr/pollset.h7
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c5
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c7
-rw-r--r--src/core/iomgr/pollset_posix.c31
-rw-r--r--src/core/iomgr/pollset_posix.h9
-rw-r--r--src/core/iomgr/pollset_windows.c16
-rw-r--r--src/core/iomgr/resolve_address.h2
-rw-r--r--src/core/iomgr/resolve_address_posix.c5
-rw-r--r--src/core/iomgr/sockaddr.h2
-rw-r--r--src/core/iomgr/sockaddr_posix.h2
-rw-r--r--src/core/iomgr/sockaddr_utils.c6
-rw-r--r--src/core/iomgr/sockaddr_utils.h2
-rw-r--r--src/core/iomgr/sockaddr_win32.h2
-rw-r--r--src/core/iomgr/socket_utils_posix.h2
-rw-r--r--src/core/iomgr/socket_windows.c2
-rw-r--r--src/core/iomgr/socket_windows.h4
-rw-r--r--src/core/iomgr/tcp_client.h2
-rw-r--r--src/core/iomgr/tcp_client_posix.c3
-rw-r--r--src/core/iomgr/tcp_posix.c3
-rw-r--r--src/core/iomgr/tcp_posix.h2
-rw-r--r--src/core/iomgr/tcp_server_windows.c12
-rw-r--r--src/core/iomgr/tcp_windows.c59
-rw-r--r--src/core/iomgr/tcp_windows.h2
-rw-r--r--src/core/iomgr/time_averaged_stats.h2
-rw-r--r--src/core/iomgr/udp_server.c440
-rw-r--r--src/core/iomgr/udp_server.h85
-rw-r--r--src/core/iomgr/wakeup_fd_eventfd.c5
-rw-r--r--src/core/iomgr/wakeup_fd_nospecial.c9
-rw-r--r--src/core/iomgr/wakeup_fd_pipe.c2
-rw-r--r--src/core/iomgr/wakeup_fd_pipe.h2
-rw-r--r--src/core/iomgr/wakeup_fd_posix.c6
-rw-r--r--src/core/iomgr/wakeup_fd_posix.h2
-rw-r--r--src/core/json/json.h2
-rw-r--r--src/core/json/json_common.h2
-rw-r--r--src/core/json/json_reader.c36
-rw-r--r--src/core/json/json_reader.h2
-rw-r--r--src/core/json/json_string.c50
-rw-r--r--src/core/json/json_writer.c33
-rw-r--r--src/core/json/json_writer.h14
-rw-r--r--src/core/profiling/timers.h2
-rw-r--r--src/core/security/auth_filters.h2
-rw-r--r--src/core/security/base64.h2
-rw-r--r--src/core/security/client_auth_filter.c8
-rw-r--r--src/core/security/credentials.c20
-rw-r--r--src/core/security/credentials.h5
-rw-r--r--src/core/security/credentials_metadata.c4
-rw-r--r--src/core/security/google_default_credentials.c6
-rw-r--r--src/core/security/json_token.h2
-rw-r--r--src/core/security/jwt_verifier.h1
-rw-r--r--src/core/security/secure_endpoint.c2
-rw-r--r--src/core/security/secure_endpoint.h2
-rw-r--r--src/core/security/secure_transport_setup.h2
-rw-r--r--src/core/security/security_connector.c20
-rw-r--r--src/core/security/security_context.c4
-rw-r--r--src/core/security/security_context.h3
-rw-r--r--src/core/security/server_auth_filter.c31
-rw-r--r--src/core/statistics/census_interface.h2
-rw-r--r--src/core/statistics/census_log.h2
-rw-r--r--src/core/statistics/census_rpc_stats.c4
-rw-r--r--src/core/statistics/census_rpc_stats.h2
-rw-r--r--src/core/statistics/census_tracing.c7
-rw-r--r--src/core/statistics/census_tracing.h2
-rw-r--r--src/core/statistics/hash_table.h2
-rw-r--r--src/core/support/cpu_iphone.c8
-rw-r--r--src/core/support/cpu_linux.c2
-rw-r--r--src/core/support/env.h2
-rw-r--r--src/core/support/file.h2
-rw-r--r--src/core/support/histogram.c11
-rw-r--r--src/core/support/log_linux.c4
-rw-r--r--src/core/support/murmur_hash.h2
-rw-r--r--src/core/support/slice.c3
-rw-r--r--src/core/support/slice_buffer.c3
-rw-r--r--src/core/support/stack_lockfree.c20
-rw-r--r--src/core/support/string.c19
-rw-r--r--src/core/support/string.h4
-rw-r--r--src/core/support/string_win32.c8
-rw-r--r--src/core/support/string_win32.h4
-rw-r--r--src/core/support/sync_posix.c3
-rw-r--r--src/core/support/sync_win32.c3
-rw-r--r--src/core/support/thd.c4
-rw-r--r--src/core/support/thd_internal.h2
-rw-r--r--src/core/support/thd_posix.c14
-rw-r--r--src/core/support/thd_win32.c4
-rw-r--r--src/core/support/time.c3
-rw-r--r--src/core/support/tls_pthread.c2
-rw-r--r--src/core/surface/byte_buffer_queue.h2
-rw-r--r--src/core/surface/call.c91
-rw-r--r--src/core/surface/call.h19
-rw-r--r--src/core/surface/call_log_batch.c15
-rw-r--r--src/core/surface/channel.c36
-rw-r--r--src/core/surface/channel.h2
-rw-r--r--src/core/surface/channel_connectivity.c16
-rw-r--r--src/core/surface/completion_queue.c24
-rw-r--r--src/core/surface/event_string.h2
-rw-r--r--src/core/surface/init.c32
-rw-r--r--src/core/surface/init.h2
-rw-r--r--src/core/surface/init_unsecure.c3
-rw-r--r--src/core/surface/lame_client.c25
-rw-r--r--src/core/surface/secure_channel_create.c8
-rw-r--r--src/core/surface/server.c15
-rw-r--r--src/core/surface/server_create.c2
-rw-r--r--src/core/surface/surface_trace.h4
-rw-r--r--src/core/transport/chttp2/frame_data.c4
-rw-r--r--src/core/transport/chttp2/parsing.c7
-rw-r--r--src/core/transport/chttp2/stream_encoder.c12
-rw-r--r--src/core/transport/chttp2/stream_lists.c2
-rw-r--r--src/core/transport/chttp2/stream_map.c3
-rw-r--r--src/core/transport/chttp2/writing.c29
-rw-r--r--src/core/transport/chttp2_transport.c22
-rw-r--r--src/core/transport/metadata.c13
-rw-r--r--src/core/transport/metadata.h5
-rw-r--r--src/core/transport/stream_op.c6
-rw-r--r--src/core/tsi/fake_transport_security.c11
-rw-r--r--src/core/tsi/fake_transport_security.h2
-rw-r--r--src/core/tsi/ssl_transport_security.c31
-rw-r--r--src/core/tsi/ssl_transport_security.h2
-rw-r--r--src/core/tsi/transport_security.h2
-rw-r--r--src/core/tsi/transport_security_interface.h2
159 files changed, 1995 insertions, 566 deletions
diff --git a/src/core/channel/census_filter.h b/src/core/channel/census_filter.h
index 4f9759f0db..1453c05d28 100644
--- a/src/core/channel/census_filter.h
+++ b/src/core/channel/census_filter.h
@@ -41,4 +41,4 @@
extern const grpc_channel_filter grpc_client_census_filter;
extern const grpc_channel_filter grpc_server_census_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index c430b56fa2..54ee75af28 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include <string.h>
@@ -146,3 +147,65 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
+
+/** Returns 1 if the argument for compression algorithm's enabled states bitset
+ * was found in \a a, returning the arg's value in \a states. Otherwise, returns
+ * 0. */
+static int find_compression_algorithm_states_bitset(
+ const grpc_channel_args *a, int **states_arg) {
+ if (a != NULL) {
+ size_t i;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
+ *states_arg = &a->args[i].value.integer;
+ return 1; /* GPR_TRUE */
+ }
+ }
+ }
+ return 0; /* GPR_FALSE */
+}
+
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
+ grpc_channel_args **a,
+ grpc_compression_algorithm algorithm,
+ int state) {
+ int *states_arg;
+ grpc_channel_args *result = *a;
+ const int states_arg_found =
+ find_compression_algorithm_states_bitset(*a, &states_arg);
+
+ if (states_arg_found) {
+ if (state != 0) {
+ GPR_BITSET(states_arg, algorithm);
+ } else {
+ GPR_BITCLEAR(states_arg, algorithm);
+ }
+ } else {
+ /* create a new arg */
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ /* all enabled by default */
+ tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
+ if (state != 0) {
+ GPR_BITSET(&tmp.value.integer, algorithm);
+ } else {
+ GPR_BITCLEAR(&tmp.value.integer, algorithm);
+ }
+ result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
+ grpc_channel_args_destroy(*a);
+ *a = result;
+ }
+ return result;
+}
+
+int grpc_channel_args_compression_algorithm_get_states(
+ const grpc_channel_args *a) {
+ int *states_arg;
+ if (find_compression_algorithm_states_bitset(a, &states_arg)) {
+ return *states_arg;
+ } else {
+ return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
+ }
+}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index 7e6ddd3997..06a6012dee 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -67,4 +67,24 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm);
+/** Sets the support for the given compression algorithm. By default, all
+ * compression algorithms are enabled. It's an error to disable an algorithm set
+ * by grpc_channel_args_set_compression_algorithm.
+ *
+ * Returns an instance will the updated algorithm states. The \a a pointer is
+ * modified to point to the returned instance (which may be different from the
+ * input value of \a a). */
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
+ grpc_channel_args **a,
+ grpc_compression_algorithm algorithm,
+ int enabled);
+
+/** Returns the bitset representing the support state (true for enabled, false
+ * for disabled) for compression algorithms.
+ *
+ * The i-th bit of the returned bitset corresponds to the i-th entry in the
+ * grpc_compression_algorithm enum. */
+int grpc_channel_args_compression_algorithm_get_states(
+ const grpc_channel_args *a);
+
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 6c2e6b38a8..2e25033813 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -84,8 +84,10 @@ typedef struct {
grpc_pollset_set pollset_set;
} channel_data;
-/** We create one watcher for each new lb_policy that is returned from a resolver,
- to watch for state changes from the lb_policy. When a state change is seen, we
+/** We create one watcher for each new lb_policy that is returned from a
+ resolver,
+ to watch for state changes from the lb_policy. When a state change is seen,
+ we
update the channel, and create a new watcher */
typedef struct {
channel_data *chand;
@@ -380,7 +382,8 @@ static void perform_transport_stream_op(grpc_call_element *elem,
if (lb_policy) {
grpc_transport_stream_op *op = &calld->waiting_op;
grpc_pollset *bind_pollset = op->bind_pollset;
- grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
+ grpc_metadata_batch *initial_metadata =
+ &op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
@@ -388,13 +391,14 @@ static void perform_transport_stream_op(grpc_call_element *elem,
GPR_ASSERT(op->bind_pollset);
GPR_ASSERT(op->send_ops);
GPR_ASSERT(op->send_ops->nops >= 1);
- GPR_ASSERT(
- op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_iomgr_closure_init(&calld->async_setup_task, picked_target,
+ calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
- &calld->picked_channel, &calld->async_setup_task);
+ &calld->picked_channel,
+ &calld->async_setup_task);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else if (chand->resolver != NULL) {
@@ -430,7 +434,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
perform_transport_stream_op(elem, op, 0);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state);
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state);
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
@@ -450,7 +455,8 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
gpr_free(w);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) {
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
@@ -499,13 +505,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
+ grpc_connectivity_state_set(&chand->state_tracker, state,
+ "new_lb+resolver");
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
- grpc_connectivity_state_set(&chand->state_tracker, state,
- "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(chand, lb_policy, state);
}
@@ -663,7 +669,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
+ "client_channel");
}
/* Destructor for channel_data */
@@ -747,19 +754,20 @@ void grpc_client_channel_watch_connectivity_state(
gpr_mu_unlock(&chand->mu_config);
}
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) {
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
+ grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
return &chand->pollset_set;
}
void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
}
void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index cd81294eb3..13681e3956 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -59,11 +59,12 @@ void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete);
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem);
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
+ grpc_channel_element *elem);
void grpc_client_channel_add_interested_party(grpc_channel_element *channel,
- grpc_pollset *pollset);
+ grpc_pollset *pollset);
void grpc_client_channel_del_interested_party(grpc_channel_element *channel,
- grpc_pollset *pollset);
+ grpc_pollset *pollset);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 8963c13b0f..762a4edc73 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -35,22 +35,25 @@
#include <string.h>
#include <grpc/compression.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include "src/core/channel/compress_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/compression/message_compress.h"
+#include "src/core/support/string.h"
typedef struct call_data {
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
+ grpc_linked_mdelem accept_encoding_storage;
int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */
int written_initial_metadata; /**< Already processed initial md? */
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
- /** If true, contents of \a compression_algorithm are authoritative */
+ /** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
} call_data;
@@ -59,8 +62,12 @@ typedef struct channel_data {
grpc_mdstr *mdstr_request_compression_algorithm_key;
/** Metadata key for the outgoing (used) compression algorithm */
grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
+ /** Metadata key for the accepted encodings */
+ grpc_mdstr *mdstr_compression_capabilities_key;
/** Precomputed metadata elements for all available compression algorithms */
grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
+ /** Precomputed metadata elements for the accepted encodings */
+ grpc_mdelem *mdelem_accept_encoding;
/** The default, channel-level, compression algorithm */
grpc_compression_algorithm default_compression_algorithm;
} channel_data;
@@ -71,7 +78,7 @@ typedef struct channel_data {
*
* Returns 1 if the data was actually compress and 0 otherwise. */
static int compress_send_sb(grpc_compression_algorithm algorithm,
- gpr_slice_buffer *slices) {
+ gpr_slice_buffer *slices) {
int did_compress;
gpr_slice_buffer tmp;
gpr_slice_buffer_init(&tmp);
@@ -86,14 +93,14 @@ static int compress_send_sb(grpc_compression_algorithm algorithm,
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
-static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
if (md->key == channeld->mdstr_request_compression_algorithm_key) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- if (!grpc_compression_algorithm_parse(md_c_str,
+ if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
&calld->compression_algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.",
md_c_str);
@@ -108,10 +115,10 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
static int skip_compression(channel_data *channeld, call_data *calld) {
if (calld->has_compression_algorithm) {
- if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return 1;
- }
- return 0; /* we have an actual call-specific algorithm */
+ if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
+ return 1;
+ }
+ return 0; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@@ -184,7 +191,7 @@ static void process_send_ops(grpc_call_element *elem,
* given by GRPC_OP_BEGIN_MESSAGE) */
calld->remaining_slice_bytes = sop->data.begin_message.length;
if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
break;
@@ -202,10 +209,17 @@ static void process_send_ops(grpc_call_element *elem,
channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
+ /* hint compression algorithm */
grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->compression_algorithm_storage,
GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
+
+ /* convey supported compression algorithms */
+ grpc_metadata_batch_add_tail(
+ &(sop->data.metadata), &calld->accept_encoding_storage,
+ GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
+
calld->written_initial_metadata = 1; /* GPR_TRUE */
}
break;
@@ -279,6 +293,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
+ const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
+ char *accept_encoding_str;
+ size_t accept_encoding_str_len;
channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args);
@@ -289,6 +306,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channeld->mdstr_outgoing_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
+ channeld->mdstr_compression_capabilities_key =
+ grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0);
+
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorithm_name;
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
@@ -297,8 +317,22 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
mdctx,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name, 0));
+ if (algo_idx > 0) {
+ supported_algorithms_names[algo_idx - 1] = algorithm_name;
+ }
}
+ /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
+ * arrays, as to avoid the heap allocs */
+ accept_encoding_str = gpr_strjoin_sep(
+ supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names),
+ ", ", &accept_encoding_str_len);
+
+ channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
+ mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
+ grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
+ gpr_free(accept_encoding_str);
+
GPR_ASSERT(!is_last);
}
@@ -309,10 +343,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
- for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
- ++algo_idx) {
+ GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
}
+ GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);
}
const grpc_channel_filter grpc_compress_filter = {
diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h
index 0694e2c1dd..0917e81ca4 100644
--- a/src/core/channel/compress_filter.h
+++ b/src/core/channel/compress_filter.h
@@ -62,4 +62,4 @@
extern const grpc_channel_filter grpc_compress_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */
diff --git a/src/core/channel/http_client_filter.h b/src/core/channel/http_client_filter.h
index 04eb839e00..21c66b9b8e 100644
--- a/src/core/channel/http_client_filter.h
+++ b/src/core/channel/http_client_filter.h
@@ -41,4 +41,4 @@ extern const grpc_channel_filter grpc_http_client_filter;
#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */
diff --git a/src/core/channel/http_server_filter.h b/src/core/channel/http_server_filter.h
index 42f76ed17f..f219d4e66f 100644
--- a/src/core/channel/http_server_filter.h
+++ b/src/core/channel/http_server_filter.h
@@ -39,4 +39,4 @@
/* Processes metadata on the client side for HTTP2 transports */
extern const grpc_channel_filter grpc_http_server_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */
diff --git a/src/core/channel/noop_filter.h b/src/core/channel/noop_filter.h
index 96463e5322..ded9b33117 100644
--- a/src/core/channel/noop_filter.h
+++ b/src/core/channel/noop_filter.h
@@ -41,4 +41,4 @@
customize for their own filters */
extern const grpc_channel_filter grpc_no_op_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 827b1a2be5..7b35b7902f 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -219,7 +219,8 @@ static grpc_resolver *dns_create(
default_host_arg.type = GRPC_ARG_STRING;
default_host_arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
default_host_arg.value.string = host;
- subchannel_factory = grpc_subchannel_factory_add_channel_arg(subchannel_factory, &default_host_arg);
+ subchannel_factory = grpc_subchannel_factory_add_channel_arg(
+ subchannel_factory, &default_host_arg);
gpr_free(host);
gpr_free(port);
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 74584e7e2c..8419873908 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -60,9 +60,12 @@ typedef struct {
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels);
- /** the address that we've 'resolved' */
- struct sockaddr_storage addr;
- int addr_len;
+ /** the addresses that we've 'resolved' */
+ struct sockaddr_storage *addrs;
+ /** the corresponding length of the addresses */
+ int *addrs_len;
+ /** how many elements in \a addrs */
+ size_t num_addrs;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -119,17 +122,22 @@ static void sockaddr_next(grpc_resolver *resolver,
static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
- grpc_subchannel *subchannel;
+ grpc_subchannel **subchannels;
grpc_subchannel_args args;
if (r->next_completion != NULL && !r->published) {
+ size_t i;
cfg = grpc_client_config_create();
- memset(&args, 0, sizeof(args));
- args.addr = (struct sockaddr *)&r->addr;
- args.addr_len = r->addr_len;
- subchannel =
- grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args);
- lb_policy = r->lb_policy_factory(&subchannel, 1);
+ subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs);
+ for (i = 0; i < r->num_addrs; i++) {
+ memset(&args, 0, sizeof(args));
+ args.addr = (struct sockaddr *)&r->addrs[i];
+ args.addr_len = r->addrs_len[i];
+ subchannels[i] = grpc_subchannel_factory_create_subchannel(
+ r->subchannel_factory, &args);
+ }
+ lb_policy = r->lb_policy_factory(subchannels, r->num_addrs);
+ gpr_free(subchannels);
grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
@@ -143,6 +151,8 @@ static void sockaddr_destroy(grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
+ gpr_free(r->addrs);
+ gpr_free(r->addrs_len);
gpr_free(r);
}
@@ -238,13 +248,18 @@ done:
return result;
}
+static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_uri *uri,
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels),
grpc_subchannel_factory *subchannel_factory,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) {
+ size_t i;
+ int errors_found = 0; /* GPR_FALSE */
sockaddr_resolver *r;
+ gpr_slice path_slice;
+ gpr_slice_buffer path_parts;
if (0 != strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
@@ -253,7 +268,29 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
- if (!parse(uri, &r->addr, &r->addr_len)) {
+
+ path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing);
+ gpr_slice_buffer_init(&path_parts);
+
+ gpr_slice_split(path_slice, ",", &path_parts);
+ r->num_addrs = path_parts.count;
+ r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs);
+ r->addrs_len = gpr_malloc(sizeof(int) * r->num_addrs);
+
+ for(i = 0; i < r->num_addrs; i++) {
+ grpc_uri ith_uri = *uri;
+ char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
+ ith_uri.path = part_str;
+ if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) {
+ errors_found = 1; /* GPR_TRUE */
+ }
+ gpr_free(part_str);
+ if (errors_found) break;
+ }
+
+ gpr_slice_buffer_destroy(&path_parts);
+ gpr_slice_unref(path_slice);
+ if (errors_found) {
gpr_free(r);
return NULL;
}
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
new file mode 100644
index 0000000000..acb2ba136e
--- /dev/null
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -0,0 +1,501 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/client_config/resolvers/zookeeper_resolver.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include <grpc/grpc_zookeeper.h>
+#include <zookeeper/zookeeper.h>
+
+#include "src/core/client_config/lb_policies/pick_first.h"
+#include "src/core/client_config/resolver_registry.h"
+#include "src/core/iomgr/resolve_address.h"
+#include "src/core/support/string.h"
+#include "src/core/json/json.h"
+
+/** Zookeeper session expiration time in milliseconds */
+#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000
+
+typedef struct {
+ /** base class: must be first */
+ grpc_resolver base;
+ /** refcount */
+ gpr_refcount refs;
+ /** name to resolve */
+ char *name;
+ /** subchannel factory */
+ grpc_subchannel_factory *subchannel_factory;
+ /** load balancing policy factory */
+ grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
+ size_t num_subchannels);
+
+ /** mutex guarding the rest of the state */
+ gpr_mu mu;
+ /** are we currently resolving? */
+ int resolving;
+ /** which version of resolved_config have we published? */
+ int published_version;
+ /** which version of resolved_config is current? */
+ int resolved_version;
+ /** pending next completion, or NULL */
+ grpc_iomgr_closure *next_completion;
+ /** target config address for next completion */
+ grpc_client_config **target_config;
+ /** current (fully resolved) config */
+ grpc_client_config *resolved_config;
+
+ /** zookeeper handle */
+ zhandle_t *zookeeper_handle;
+ /** zookeeper resolved addresses */
+ grpc_resolved_addresses *resolved_addrs;
+ /** total number of addresses to be resolved */
+ int resolved_total;
+ /** number of addresses resolved */
+ int resolved_num;
+} zookeeper_resolver;
+
+static void zookeeper_destroy(grpc_resolver *r);
+
+static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
+static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r);
+
+static void zookeeper_shutdown(grpc_resolver *r);
+static void zookeeper_channel_saw_error(grpc_resolver *r,
+ struct sockaddr *failing_address,
+ int failing_address_len);
+static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config,
+ grpc_iomgr_closure *on_complete);
+
+static const grpc_resolver_vtable zookeeper_resolver_vtable = {
+ zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error,
+ zookeeper_next};
+
+static void zookeeper_shutdown(grpc_resolver *resolver) {
+ zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ if (r->next_completion != NULL) {
+ *r->target_config = NULL;
+ grpc_iomgr_add_callback(r->next_completion);
+ r->next_completion = NULL;
+ }
+ zookeeper_close(r->zookeeper_handle);
+ gpr_mu_unlock(&r->mu);
+}
+
+static void zookeeper_channel_saw_error(grpc_resolver *resolver,
+ struct sockaddr *sa, int len) {
+ zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ if (r->resolving == 0) {
+ zookeeper_start_resolving_locked(r);
+ }
+ gpr_mu_unlock(&r->mu);
+}
+
+static void zookeeper_next(grpc_resolver *resolver,
+ grpc_client_config **target_config,
+ grpc_iomgr_closure *on_complete) {
+ zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ GPR_ASSERT(r->next_completion == NULL);
+ r->next_completion = on_complete;
+ r->target_config = target_config;
+ if (r->resolved_version == 0 && r->resolving == 0) {
+ zookeeper_start_resolving_locked(r);
+ } else {
+ zookeeper_maybe_finish_next_locked(r);
+ }
+ gpr_mu_unlock(&r->mu);
+}
+
+/** Zookeeper global watcher for connection management
+ TODO: better connection management besides logs */
+static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type,
+ int state, const char *path,
+ void *watcher_ctx) {
+ if (type == ZOO_SESSION_EVENT) {
+ if (state == ZOO_EXPIRED_SESSION_STATE) {
+ gpr_log(GPR_ERROR, "Zookeeper session expired");
+ } else if (state == ZOO_AUTH_FAILED_STATE) {
+ gpr_log(GPR_ERROR, "Zookeeper authentication failed");
+ }
+ }
+}
+
+/** Zookeeper watcher triggered by changes to watched nodes
+ Once triggered, it tries to resolve again to get updated addresses */
+static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
+ const char *path, void *watcher_ctx) {
+ if (watcher_ctx != NULL) {
+ zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx;
+ if (state == ZOO_CONNECTED_STATE) {
+ gpr_mu_lock(&r->mu);
+ if (r->resolving == 0) {
+ zookeeper_start_resolving_locked(r);
+ }
+ gpr_mu_unlock(&r->mu);
+ }
+ }
+}
+
+/** Callback function after getting all resolved addresses
+ Creates a subchannel for each address */
+static void zookeeper_on_resolved(void *arg,
+ grpc_resolved_addresses *addresses) {
+ zookeeper_resolver *r = arg;
+ grpc_client_config *config = NULL;
+ grpc_subchannel **subchannels;
+ grpc_subchannel_args args;
+ grpc_lb_policy *lb_policy;
+ size_t i;
+ if (addresses != NULL) {
+ config = grpc_client_config_create();
+ subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
+ for (i = 0; i < addresses->naddrs; i++) {
+ memset(&args, 0, sizeof(args));
+ args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
+ args.addr_len = addresses->addrs[i].len;
+ subchannels[i] = grpc_subchannel_factory_create_subchannel(
+ r->subchannel_factory, &args);
+ }
+ lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs);
+ grpc_client_config_set_lb_policy(config, lb_policy);
+ GRPC_LB_POLICY_UNREF(lb_policy, "construction");
+ grpc_resolved_addresses_destroy(addresses);
+ gpr_free(subchannels);
+ }
+ gpr_mu_lock(&r->mu);
+ GPR_ASSERT(r->resolving == 1);
+ r->resolving = 0;
+ if (r->resolved_config != NULL) {
+ grpc_client_config_unref(r->resolved_config);
+ }
+ r->resolved_config = config;
+ r->resolved_version++;
+ zookeeper_maybe_finish_next_locked(r);
+ gpr_mu_unlock(&r->mu);
+
+ GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving");
+}
+
+/** Callback function for each DNS resolved address */
+static void zookeeper_dns_resolved(void *arg,
+ grpc_resolved_addresses *addresses) {
+ size_t i;
+ zookeeper_resolver *r = arg;
+ int resolve_done = 0;
+
+ gpr_mu_lock(&r->mu);
+ r->resolved_num++;
+ r->resolved_addrs->addrs =
+ gpr_realloc(r->resolved_addrs->addrs,
+ sizeof(grpc_resolved_address) *
+ (r->resolved_addrs->naddrs + addresses->naddrs));
+ for (i = 0; i < addresses->naddrs; i++) {
+ memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr,
+ addresses->addrs[i].addr, addresses->addrs[i].len);
+ r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len =
+ addresses->addrs[i].len;
+ }
+
+ r->resolved_addrs->naddrs += addresses->naddrs;
+ grpc_resolved_addresses_destroy(addresses);
+
+ /** Wait for all addresses to be resolved */
+ resolve_done = (r->resolved_num == r->resolved_total);
+ gpr_mu_unlock(&r->mu);
+ if (resolve_done) {
+ zookeeper_on_resolved(r, r->resolved_addrs);
+ }
+}
+
+/** Parses JSON format address of a zookeeper node */
+static char *zookeeper_parse_address(const char *value, int value_len) {
+ grpc_json *json;
+ grpc_json *cur;
+ const char *host;
+ const char *port;
+ char *buffer;
+ char *address = NULL;
+
+ buffer = gpr_malloc(value_len);
+ memcpy(buffer, value, value_len);
+ json = grpc_json_parse_string_with_len(buffer, value_len);
+ if (json != NULL) {
+ host = NULL;
+ port = NULL;
+ for (cur = json->child; cur != NULL; cur = cur->next) {
+ if (!strcmp(cur->key, "host")) {
+ host = cur->value;
+ if (port != NULL) {
+ break;
+ }
+ } else if (!strcmp(cur->key, "port")) {
+ port = cur->value;
+ if (host != NULL) {
+ break;
+ }
+ }
+ }
+ if (host != NULL && port != NULL) {
+ gpr_asprintf(&address, "%s:%s", host, port);
+ }
+ grpc_json_destroy(json);
+ }
+ gpr_free(buffer);
+
+ return address;
+}
+
+static void zookeeper_get_children_node_completion(int rc, const char *value,
+ int value_len,
+ const struct Stat *stat,
+ const void *arg) {
+ char *address = NULL;
+ zookeeper_resolver *r = (zookeeper_resolver *)arg;
+ int resolve_done = 0;
+
+ if (rc != 0) {
+ gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name);
+ return;
+ }
+
+ address = zookeeper_parse_address(value, value_len);
+ if (address != NULL) {
+ /** Further resolves address by DNS */
+ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
+ gpr_free(address);
+ } else {
+ gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name);
+ gpr_mu_lock(&r->mu);
+ r->resolved_total--;
+ resolve_done = (r->resolved_num == r->resolved_total);
+ gpr_mu_unlock(&r->mu);
+ if (resolve_done) {
+ zookeeper_on_resolved(r, r->resolved_addrs);
+ }
+ }
+}
+
+static void zookeeper_get_children_completion(
+ int rc, const struct String_vector *children, const void *arg) {
+ char *path;
+ int status;
+ int i;
+ zookeeper_resolver *r = (zookeeper_resolver *)arg;
+
+ if (rc != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
+ return;
+ }
+
+ if (children->count == 0) {
+ gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name);
+ return;
+ }
+
+ r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
+ r->resolved_addrs->addrs = NULL;
+ r->resolved_addrs->naddrs = 0;
+ r->resolved_total = children->count;
+
+ /** TODO: Replace expensive heap allocation with stack
+ if we can get maximum length of zookeeper path */
+ for (i = 0; i < children->count; i++) {
+ gpr_asprintf(&path, "%s/%s", r->name, children->data[i]);
+ status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r,
+ zookeeper_get_children_node_completion, r);
+ gpr_free(path);
+ if (status != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
+ }
+ }
+}
+
+static void zookeeper_get_node_completion(int rc, const char *value,
+ int value_len,
+ const struct Stat *stat,
+ const void *arg) {
+ int status;
+ char *address = NULL;
+ zookeeper_resolver *r = (zookeeper_resolver *)arg;
+ r->resolved_addrs = NULL;
+ r->resolved_total = 0;
+ r->resolved_num = 0;
+
+ if (rc != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
+ return;
+ }
+
+ /** If zookeeper node of path r->name does not have address
+ (i.e. service node), get its children */
+ address = zookeeper_parse_address(value, value_len);
+ if (address != NULL) {
+ r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
+ r->resolved_addrs->addrs = NULL;
+ r->resolved_addrs->naddrs = 0;
+ r->resolved_total = 1;
+ /** Further resolves address by DNS */
+ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
+ gpr_free(address);
+ return;
+ }
+
+ status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher,
+ r, zookeeper_get_children_completion, r);
+ if (status != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
+ }
+}
+
+static void zookeeper_resolve_address(zookeeper_resolver *r) {
+ int status;
+ status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r,
+ zookeeper_get_node_completion, r);
+ if (status != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
+ }
+}
+
+static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
+ GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving");
+ GPR_ASSERT(r->resolving == 0);
+ r->resolving = 1;
+ zookeeper_resolve_address(r);
+}
+
+static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
+ if (r->next_completion != NULL &&
+ r->resolved_version != r->published_version) {
+ *r->target_config = r->resolved_config;
+ if (r->resolved_config != NULL) {
+ grpc_client_config_ref(r->resolved_config);
+ }
+ grpc_iomgr_add_callback(r->next_completion);
+ r->next_completion = NULL;
+ r->published_version = r->resolved_version;
+ }
+}
+
+static void zookeeper_destroy(grpc_resolver *gr) {
+ zookeeper_resolver *r = (zookeeper_resolver *)gr;
+ gpr_mu_destroy(&r->mu);
+ if (r->resolved_config != NULL) {
+ grpc_client_config_unref(r->resolved_config);
+ }
+ grpc_subchannel_factory_unref(r->subchannel_factory);
+ gpr_free(r->name);
+ gpr_free(r);
+}
+
+static grpc_resolver *zookeeper_create(
+ grpc_uri *uri,
+ grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
+ size_t num_subchannels),
+ grpc_subchannel_factory *subchannel_factory) {
+ zookeeper_resolver *r;
+ size_t length;
+ char *path = uri->path;
+
+ if (0 == strcmp(uri->authority, "")) {
+ gpr_log(GPR_ERROR, "No authority specified in zookeeper uri");
+ return NULL;
+ }
+
+ /** Removes the trailing slash if exists */
+ length = strlen(path);
+ if (length > 1 && path[length - 1] == '/') {
+ path[length - 1] = 0;
+ }
+
+ r = gpr_malloc(sizeof(zookeeper_resolver));
+ memset(r, 0, sizeof(*r));
+ gpr_ref_init(&r->refs, 1);
+ gpr_mu_init(&r->mu);
+ grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
+ r->name = gpr_strdup(path);
+
+ r->subchannel_factory = subchannel_factory;
+ r->lb_policy_factory = lb_policy_factory;
+ grpc_subchannel_factory_ref(subchannel_factory);
+
+ /** Initializes zookeeper client */
+ zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
+ r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher,
+ GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
+ if (r->zookeeper_handle == NULL) {
+ gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
+ return NULL;
+ }
+
+ return &r->base;
+}
+
+static void zookeeper_plugin_init() {
+ grpc_register_resolver_type("zookeeper",
+ grpc_zookeeper_resolver_factory_create());
+}
+
+void grpc_zookeeper_register() {
+ grpc_register_plugin(zookeeper_plugin_init, NULL);
+}
+
+/*
+ * FACTORY
+ */
+
+static void zookeeper_factory_ref(grpc_resolver_factory *factory) {}
+
+static void zookeeper_factory_unref(grpc_resolver_factory *factory) {}
+
+static grpc_resolver *zookeeper_factory_create_resolver(
+ grpc_resolver_factory *factory, grpc_uri *uri,
+ grpc_subchannel_factory *subchannel_factory) {
+ return zookeeper_create(uri, grpc_create_pick_first_lb_policy,
+ subchannel_factory);
+}
+
+static const grpc_resolver_factory_vtable zookeeper_factory_vtable = {
+ zookeeper_factory_ref, zookeeper_factory_unref,
+ zookeeper_factory_create_resolver};
+static grpc_resolver_factory zookeeper_resolver_factory = {
+ &zookeeper_factory_vtable};
+
+grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() {
+ return &zookeeper_resolver_factory;
+}
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.h b/src/core/client_config/resolvers/zookeeper_resolver.h
new file mode 100644
index 0000000000..a6f002dd6d
--- /dev/null
+++ b/src/core/client_config/resolvers/zookeeper_resolver.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H
+#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H
+
+#include "src/core/client_config/resolver_factory.h"
+
+/** Create a zookeeper resolver factory */
+grpc_resolver_factory *grpc_zookeeper_resolver_factory_create(void);
+
+#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index d1cd33b2af..2e36c69134 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -91,8 +91,10 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify);
+/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
grpc_pollset *pollset);
+/** stop following \a channel's activity through \a pollset. */
void grpc_subchannel_del_interested_party(grpc_subchannel *channel,
grpc_pollset *pollset);
diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c
index 7dc6d99ebe..585e465fa4 100644
--- a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c
+++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c
@@ -35,9 +35,9 @@
#include "src/core/client_config/subchannel_factory_decorators/merge_channel_args.h"
grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg(
- grpc_subchannel_factory *input, const grpc_arg *arg) {
- grpc_channel_args args;
- args.num_args = 1;
- args.args = (grpc_arg *)arg;
- return grpc_subchannel_factory_merge_channel_args(input, &args);
+ grpc_subchannel_factory *input, const grpc_arg *arg) {
+ grpc_channel_args args;
+ args.num_args = 1;
+ args.args = (grpc_arg *)arg;
+ return grpc_subchannel_factory_merge_channel_args(input, &args);
}
diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h
index 1937623374..8457294000 100644
--- a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h
+++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h
@@ -40,6 +40,7 @@
channel_args by adding a new argument; ownership of input, arg is retained
by the caller. */
grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg(
- grpc_subchannel_factory *input, const grpc_arg *arg);
+ grpc_subchannel_factory *input, const grpc_arg *arg);
-#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H */
+#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H \
+ */
diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
index 7e028857ac..c1b5507fde 100644
--- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
+++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
@@ -50,7 +50,7 @@ static void merge_args_factory_ref(grpc_subchannel_factory *scf) {
static void merge_args_factory_unref(grpc_subchannel_factory *scf) {
merge_args_factory *f = (merge_args_factory *)scf;
if (gpr_unref(&f->refs)) {
- grpc_subchannel_factory_unref(f->wrapped);
+ grpc_subchannel_factory_unref(f->wrapped);
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
@@ -73,7 +73,7 @@ static const grpc_subchannel_factory_vtable merge_args_factory_vtable = {
merge_args_factory_create_subchannel};
grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args(
- grpc_subchannel_factory *input, const grpc_channel_args *args) {
+ grpc_subchannel_factory *input, const grpc_channel_args *args) {
merge_args_factory *f = gpr_malloc(sizeof(*f));
f->base.vtable = &merge_args_factory_vtable;
gpr_ref_init(&f->refs, 1);
diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h
index 73a03b752f..f4757f0650 100644
--- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h
+++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h
@@ -40,6 +40,7 @@
channel_args by adding a new argument; ownership of input, args is retained
by the caller. */
grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args(
- grpc_subchannel_factory *input, const grpc_channel_args *args);
+ grpc_subchannel_factory *input, const grpc_channel_args *args);
-#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H */
+#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H \
+ */
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c
index 0fd028741e..6ed6dbe93f 100644
--- a/src/core/compression/algorithm.c
+++ b/src/core/compression/algorithm.c
@@ -35,13 +35,20 @@
#include <string.h>
#include <grpc/compression.h>
-int grpc_compression_algorithm_parse(const char* name,
+int grpc_compression_algorithm_parse(const char *name, size_t name_length,
grpc_compression_algorithm *algorithm) {
- if (strcmp(name, "identity") == 0) {
+ /* we use strncmp not only because it's safer (even though in this case it
+ * doesn't matter, given that we are comparing against string literals, but
+ * because this way we needn't have "name" nil-terminated (useful for slice
+ * data, for example) */
+ if (name_length == 0) {
+ return 0;
+ }
+ if (strncmp(name, "identity", name_length) == 0) {
*algorithm = GRPC_COMPRESS_NONE;
- } else if (strcmp(name, "gzip") == 0) {
+ } else if (strncmp(name, "gzip", name_length) == 0) {
*algorithm = GRPC_COMPRESS_GZIP;
- } else if (strcmp(name, "deflate") == 0) {
+ } else if (strncmp(name, "deflate", name_length) == 0) {
*algorithm = GRPC_COMPRESS_DEFLATE;
} else {
return 0;
diff --git a/src/core/debug/trace.c b/src/core/debug/trace.c
index b53dfe804b..1014b1f4db 100644
--- a/src/core/debug/trace.c
+++ b/src/core/debug/trace.c
@@ -61,8 +61,8 @@ static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
size_t np = n + 1;
char *s = gpr_malloc(end - beg + 1);
memcpy(s, beg, end - beg);
- s[end-beg] = 0;
- *ss = gpr_realloc(*ss, sizeof(char**) * np);
+ s[end - beg] = 0;
+ *ss = gpr_realloc(*ss, sizeof(char **) * np);
(*ss)[n] = s;
*ns = np;
}
@@ -73,7 +73,7 @@ static void split(const char *s, char ***ss, size_t *ns) {
add(s, s + strlen(s), ss, ns);
} else {
add(s, c, ss, ns);
- split(c+1, ss, ns);
+ split(c + 1, ss, ns);
}
}
@@ -125,7 +125,7 @@ int grpc_tracer_set_enabled(const char *name, int enabled) {
}
if (!found) {
gpr_log(GPR_ERROR, "Unknown trace var: '%s'", name);
- return 0; /* early return */
+ return 0; /* early return */
}
}
return 1;
diff --git a/src/core/debug/trace.h b/src/core/debug/trace.h
index fc8615bc69..dc5875976e 100644
--- a/src/core/debug/trace.h
+++ b/src/core/debug/trace.h
@@ -40,4 +40,4 @@ void grpc_register_tracer(const char *name, int *flag);
void grpc_tracer_init(const char *env_var_name);
void grpc_tracer_shutdown(void);
-#endif /* GRPC_INTERNAL_CORE_DEBUG_TRACE_H */
+#endif /* GRPC_INTERNAL_CORE_DEBUG_TRACE_H */
diff --git a/src/core/httpcli/format_request.c b/src/core/httpcli/format_request.c
index e875423e87..6189fce86b 100644
--- a/src/core/httpcli/format_request.c
+++ b/src/core/httpcli/format_request.c
@@ -43,7 +43,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *buf) {
+static void fill_common_header(const grpc_httpcli_request *request,
+ gpr_strvec *buf) {
size_t i;
gpr_strvec_add(buf, gpr_strdup(request->path));
gpr_strvec_add(buf, gpr_strdup(" HTTP/1.0\r\n"));
@@ -52,7 +53,8 @@ static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *
gpr_strvec_add(buf, gpr_strdup(request->host));
gpr_strvec_add(buf, gpr_strdup("\r\n"));
gpr_strvec_add(buf, gpr_strdup("Connection: close\r\n"));
- gpr_strvec_add(buf, gpr_strdup("User-Agent: "GRPC_HTTPCLI_USER_AGENT"\r\n"));
+ gpr_strvec_add(buf,
+ gpr_strdup("User-Agent: " GRPC_HTTPCLI_USER_AGENT "\r\n"));
/* user supplied headers */
for (i = 0; i < request->hdr_count; i++) {
gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].key));
diff --git a/src/core/httpcli/format_request.h b/src/core/httpcli/format_request.h
index 8bfb20bfd0..c8dc8f7d4e 100644
--- a/src/core/httpcli/format_request.h
+++ b/src/core/httpcli/format_request.h
@@ -42,4 +42,4 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request,
const char *body_bytes,
size_t body_size);
-#endif /* GRPC_INTERNAL_CORE_HTTPCLI_FORMAT_REQUEST_H */
+#endif /* GRPC_INTERNAL_CORE_HTTPCLI_FORMAT_REQUEST_H */
diff --git a/src/core/httpcli/parser.h b/src/core/httpcli/parser.h
index 71280e7479..3fbb4c7479 100644
--- a/src/core/httpcli/parser.h
+++ b/src/core/httpcli/parser.h
@@ -61,4 +61,4 @@ void grpc_httpcli_parser_destroy(grpc_httpcli_parser *parser);
int grpc_httpcli_parser_parse(grpc_httpcli_parser *parser, gpr_slice slice);
int grpc_httpcli_parser_eof(grpc_httpcli_parser *parser);
-#endif /* GRPC_INTERNAL_CORE_HTTPCLI_PARSER_H */
+#endif /* GRPC_INTERNAL_CORE_HTTPCLI_PARSER_H */
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 68d33b9cf6..ddb30dc4bb 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -105,8 +105,7 @@ void grpc_alarm_list_init(gpr_timespec now) {
void grpc_alarm_list_shutdown(void) {
int i;
- while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL,
- 0))
+ while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, 0))
;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@@ -362,7 +361,7 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_alarms(
- drop_mu, now, next,
+ drop_mu, now, next,
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
}
diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h
index c067a0b8a3..4a13527e64 100644
--- a/src/core/iomgr/alarm.h
+++ b/src/core/iomgr/alarm.h
@@ -86,4 +86,4 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
Requires: cancel() must happen after add() on a given alarm */
void grpc_alarm_cancel(grpc_alarm *alarm);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */
diff --git a/src/core/iomgr/alarm_heap.c b/src/core/iomgr/alarm_heap.c
index d912178fda..daed251982 100644
--- a/src/core/iomgr/alarm_heap.c
+++ b/src/core/iomgr/alarm_heap.c
@@ -66,11 +66,11 @@ static void adjust_downwards(grpc_alarm **first, int i, int length,
int next_i;
if (left_child >= length) break;
right_child = left_child + 1;
- next_i =
- right_child < length && gpr_time_cmp(first[left_child]->deadline,
- first[right_child]->deadline) < 0
- ? right_child
- : left_child;
+ next_i = right_child < length &&
+ gpr_time_cmp(first[left_child]->deadline,
+ first[right_child]->deadline) < 0
+ ? right_child
+ : left_child;
if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break;
first[i] = first[next_i];
first[i]->heap_index = i;
diff --git a/src/core/iomgr/alarm_heap.h b/src/core/iomgr/alarm_heap.h
index c5adfc6d31..60db6c991b 100644
--- a/src/core/iomgr/alarm_heap.h
+++ b/src/core/iomgr/alarm_heap.h
@@ -54,4 +54,4 @@ void grpc_alarm_heap_pop(grpc_alarm_heap *heap);
int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H */
diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h
index 0268a01bad..e9f98a3444 100644
--- a/src/core/iomgr/alarm_internal.h
+++ b/src/core/iomgr/alarm_internal.h
@@ -59,4 +59,4 @@ gpr_timespec grpc_alarm_list_next_timeout(void);
void grpc_kick_poller(void);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H */
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 744fe7656c..8ee14bce9b 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -50,7 +50,8 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
ep->vtable->add_to_pollset(ep, pollset);
}
-void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) {
+void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep,
+ grpc_pollset_set *pollset_set) {
ep->vtable->add_to_pollset_set(ep, pollset_set);
}
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index a2216925f9..ea92a500e8 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -103,10 +103,11 @@ void grpc_endpoint_destroy(grpc_endpoint *ep);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from
this endpoint are considered */
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset);
-void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set);
+void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep,
+ grpc_pollset_set *pollset_set);
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
};
-#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H */
diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h
index 25087be0c7..095ec5fcc9 100644
--- a/src/core/iomgr/endpoint_pair.h
+++ b/src/core/iomgr/endpoint_pair.h
@@ -44,4 +44,4 @@ typedef struct {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c
index e8295df8b3..db9d092dca 100644
--- a/src/core/iomgr/endpoint_pair_windows.c
+++ b/src/core/iomgr/endpoint_pair_windows.c
@@ -52,21 +52,26 @@ static void create_sockets(SOCKET sv[2]) {
SOCKADDR_IN addr;
int addr_len = sizeof(addr);
- lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
+ lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
+ WSA_FLAG_OVERLAPPED);
GPR_ASSERT(lst_sock != INVALID_SOCKET);
memset(&addr, 0, sizeof(addr));
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_family = AF_INET;
- GPR_ASSERT(bind(lst_sock, (struct sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR);
+ GPR_ASSERT(bind(lst_sock, (struct sockaddr *)&addr, sizeof(addr)) !=
+ SOCKET_ERROR);
GPR_ASSERT(listen(lst_sock, SOMAXCONN) != SOCKET_ERROR);
- GPR_ASSERT(getsockname(lst_sock, (struct sockaddr*)&addr, &addr_len) != SOCKET_ERROR);
+ GPR_ASSERT(getsockname(lst_sock, (struct sockaddr *)&addr, &addr_len) !=
+ SOCKET_ERROR);
- cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
+ cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
+ WSA_FLAG_OVERLAPPED);
GPR_ASSERT(cli_sock != INVALID_SOCKET);
- GPR_ASSERT(WSAConnect(cli_sock, (struct sockaddr*)&addr, addr_len, NULL, NULL, NULL, NULL) == 0);
- svr_sock = accept(lst_sock, (struct sockaddr*)&addr, &addr_len);
+ GPR_ASSERT(WSAConnect(cli_sock, (struct sockaddr *)&addr, addr_len, NULL,
+ NULL, NULL, NULL) == 0);
+ svr_sock = accept(lst_sock, (struct sockaddr *)&addr, &addr_len);
GPR_ASSERT(svr_sock != INVALID_SOCKET);
closesocket(lst_sock);
@@ -77,7 +82,8 @@ static void create_sockets(SOCKET sv[2]) {
sv[0] = svr_sock;
}
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ size_t read_slice_size) {
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 8741241fb8..09a457dd9a 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -65,18 +65,17 @@ static void do_iocp_work() {
LPOVERLAPPED overlapped;
grpc_winsocket *socket;
grpc_winsocket_callback_info *info;
- void(*f)(void *, int) = NULL;
+ void (*f)(void *, int) = NULL;
void *opaque = NULL;
- success = GetQueuedCompletionStatus(g_iocp, &bytes,
- &completion_key, &overlapped,
- INFINITE);
+ success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key,
+ &overlapped, INFINITE);
/* success = 0 and overlapped = NULL means the deadline got attained.
Which is impossible. since our wait time is +inf */
GPR_ASSERT(success || overlapped);
GPR_ASSERT(completion_key && overlapped);
if (overlapped == &g_iocp_custom_overlap) {
gpr_atm_full_fetch_add(&g_custom_events, -1);
- if (completion_key == (ULONG_PTR) &g_iocp_kick_token) {
+ if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
/* We were awoken from a kick. */
return;
}
@@ -84,7 +83,7 @@ static void do_iocp_work() {
abort();
}
- socket = (grpc_winsocket*) completion_key;
+ socket = (grpc_winsocket *)completion_key;
if (overlapped == &socket->write_info.overlapped) {
info = &socket->write_info;
} else if (overlapped == &socket->read_info.overlapped) {
@@ -121,8 +120,7 @@ static void do_iocp_work() {
}
static void iocp_loop(void *p) {
- while (gpr_atm_acq_load(&g_orphans) ||
- gpr_atm_acq_load(&g_custom_events) ||
+ while (gpr_atm_acq_load(&g_orphans) || gpr_atm_acq_load(&g_custom_events) ||
!gpr_event_get(&g_shutdown_iocp)) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
@@ -134,8 +132,8 @@ static void iocp_loop(void *p) {
void grpc_iocp_init(void) {
gpr_thd_id id;
- g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
- NULL, (ULONG_PTR)NULL, 0);
+ g_iocp =
+ CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
GPR_ASSERT(g_iocp);
gpr_event_init(&g_iocp_done);
@@ -147,8 +145,7 @@ void grpc_iocp_kick(void) {
BOOL success;
gpr_atm_full_fetch_add(&g_custom_events, 1);
- success = PostQueuedCompletionStatus(g_iocp, 0,
- (ULONG_PTR) &g_iocp_kick_token,
+ success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token,
&g_iocp_custom_overlap);
GPR_ASSERT(success);
}
@@ -165,8 +162,8 @@ void grpc_iocp_shutdown(void) {
void grpc_iocp_add_socket(grpc_winsocket *socket) {
HANDLE ret;
if (socket->added_to_iocp) return;
- ret = CreateIoCompletionPort((HANDLE)socket->socket,
- g_iocp, (gpr_uintptr) socket, 0);
+ ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp,
+ (gpr_uintptr)socket, 0);
if (!ret) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
@@ -189,7 +186,7 @@ void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_winsocket *socket,
- void(*cb)(void *, int), void *opaque,
+ void (*cb)(void *, int), void *opaque,
grpc_winsocket_callback_info *info) {
int run_now = 0;
GPR_ASSERT(!info->cb);
@@ -206,13 +203,13 @@ static void socket_notify_on_iocp(grpc_winsocket *socket,
}
void grpc_socket_notify_on_write(grpc_winsocket *socket,
- void(*cb)(void *, int), void *opaque) {
+ void (*cb)(void *, int), void *opaque) {
socket_notify_on_iocp(socket, cb, opaque, &socket->write_info);
}
-void grpc_socket_notify_on_read(grpc_winsocket *socket,
- void(*cb)(void *, int), void *opaque) {
+void grpc_socket_notify_on_read(grpc_winsocket *socket, void (*cb)(void *, int),
+ void *opaque) {
socket_notify_on_iocp(socket, cb, opaque, &socket->read_info);
}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h
index 9df6476917..ee3847a229 100644
--- a/src/core/iomgr/iocp_windows.h
+++ b/src/core/iomgr/iocp_windows.h
@@ -44,10 +44,10 @@ void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
void grpc_iocp_socket_orphan(grpc_winsocket *);
-void grpc_socket_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success),
- void *opaque);
+void grpc_socket_notify_on_write(grpc_winsocket *,
+ void (*cb)(void *, int success), void *opaque);
-void grpc_socket_notify_on_read(grpc_winsocket *, void(*cb)(void *, int success),
- void *opaque);
+void grpc_socket_notify_on_read(grpc_winsocket *,
+ void (*cb)(void *, int success), void *opaque);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_IOCP_WINDOWS_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_IOCP_WINDOWS_H */
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 6d4a82917b..261c17366a 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -77,4 +77,4 @@ void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
argument. */
void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h
index 6c1e0e1799..4cec973ba0 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_internal.h
@@ -52,4 +52,4 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);
void grpc_iomgr_platform_init(void);
void grpc_iomgr_platform_shutdown(void);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */
diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c
index 758ae77b86..2425e59941 100644
--- a/src/core/iomgr/iomgr_posix.c
+++ b/src/core/iomgr/iomgr_posix.c
@@ -51,4 +51,4 @@ void grpc_iomgr_platform_shutdown(void) {
grpc_fd_global_shutdown();
}
-#endif /* GRPC_POSIX_SOCKET */
+#endif /* GRPC_POSIX_SOCKET */
diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_posix.h
index a404f6433e..716fedb636 100644
--- a/src/core/iomgr/iomgr_posix.h
+++ b/src/core/iomgr/iomgr_posix.h
@@ -39,4 +39,4 @@
void grpc_pollset_global_init(void);
void grpc_pollset_global_shutdown(void);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */
diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c
index 74cd5a829b..b49cb87e97 100644
--- a/src/core/iomgr/iomgr_windows.c
+++ b/src/core/iomgr/iomgr_windows.c
@@ -68,4 +68,4 @@ void grpc_iomgr_platform_shutdown(void) {
winsock_shutdown();
}
-#endif /* GRPC_WINSOCK_SOCKET */
+#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index c474e4dbf1..337596cb74 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -74,10 +74,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
not be released by grpc_pollset_work AFTER worker has been destroyed.
- Returns true if some work has been done, and false if the deadline
- expired. */
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline);
+ Tries not to block past deadline. */
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec now, gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 1320c64579..fe66ebed77 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work(
pfds[1].events = POLLIN;
pfds[1].revents = 0;
- poll_rv = poll(pfds, 2, timeout_ms);
+ poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
if (poll_rv < 0) {
if (errno != EINTR) {
@@ -234,8 +234,7 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
- multipoll_with_epoll_pollset_add_fd,
- multipoll_with_epoll_pollset_del_fd,
+ multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
multipoll_with_epoll_pollset_maybe_work,
multipoll_with_epoll_pollset_finish_shutdown,
multipoll_with_epoll_pollset_destroy};
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index b5b2d7534d..30ee6e24db 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -74,7 +74,7 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset,
}
h->fds[h->fd_count++] = fd;
GRPC_FD_REF(fd, "multipoller");
-exit:
+exit:
if (and_unlock_pollset) {
gpr_mu_unlock(&pollset->mu);
}
@@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work(
POLLOUT, &watchers[i]);
}
- r = poll(pfds, pfd_count, timeout);
+ r = grpc_poll_function(pfds, pfd_count, timeout);
for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
@@ -202,8 +202,7 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable multipoll_with_poll_pollset = {
- multipoll_with_poll_pollset_add_fd,
- multipoll_with_poll_pollset_del_fd,
+ multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
multipoll_with_poll_pollset_maybe_work,
multipoll_with_poll_pollset_finish_shutdown,
multipoll_with_poll_pollset_destroy};
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index d3a9193af1..6bd1b61f24 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -38,7 +38,6 @@
#include "src/core/iomgr/pollset_posix.h"
#include <errno.h>
-#include <poll.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@@ -57,6 +56,8 @@
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
+grpc_poll_function_type grpc_poll_function = poll;
+
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
@@ -89,6 +90,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ /* pollset->mu already held */
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next;
@@ -140,10 +142,10 @@ void grpc_pollset_init(grpc_pollset *pollset) {
void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
pollset->vtable->add_fd(pollset, fd, 1);
- /* the following (enabled only in debug) will reacquire and then release
- our lock - meaning that if the unlocking flag passed to del_fd above is
- not respected, the code will deadlock (in a way that we have a chance of
- debugging) */
+/* the following (enabled only in debug) will reacquire and then release
+ our lock - meaning that if the unlocking flag passed to del_fd above is
+ not respected, the code will deadlock (in a way that we have a chance of
+ debugging) */
#ifndef NDEBUG
gpr_mu_lock(&pollset->mu);
gpr_mu_unlock(&pollset->mu);
@@ -153,10 +155,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
pollset->vtable->del_fd(pollset, fd, 1);
- /* the following (enabled only in debug) will reacquire and then release
- our lock - meaning that if the unlocking flag passed to del_fd above is
- not respected, the code will deadlock (in a way that we have a chance of
- debugging) */
+/* the following (enabled only in debug) will reacquire and then release
+ our lock - meaning that if the unlocking flag passed to del_fd above is
+ not respected, the code will deadlock (in a way that we have a chance of
+ debugging) */
#ifndef NDEBUG
gpr_mu_lock(&pollset->mu);
gpr_mu_unlock(&pollset->mu);
@@ -168,14 +170,10 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline) {
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
int added_worker = 0;
- if (gpr_time_cmp(now, deadline) > 0) {
- return 0;
- }
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
@@ -217,7 +215,6 @@ done:
gpr_mu_lock(&pollset->mu);
}
}
- return 1;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
@@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
- r = poll(pfd, nfds, timeout);
+ r = grpc_poll_function(pfd, nfds, timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 1c1b736193..69bd9cca8c 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
+#include <poll.h>
+
#include <grpc/support/sync.h>
#include "src/core/iomgr/wakeup_fd_posix.h"
@@ -102,7 +104,8 @@ void grpc_kick_drain(grpc_pollset *p);
- longer than a millisecond polls are rounded up to the next nearest
millisecond to avoid spinning
- infinite timeouts are converted to -1 */
-int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now);
+int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now);
/* turn a pollset into a multipoller: platform specific */
typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset,
@@ -117,4 +120,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
* be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset);
+/* override to allow tests to hook poll() usage */
+typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
+extern grpc_poll_function_type grpc_poll_function;
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 22dc5891c3..07522c8a0c 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -56,8 +56,7 @@ static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
grpc_pollset_worker *w = p->root_worker.next;
remove_worker(p, w);
return w;
- }
- else {
+ } else {
return NULL;
}
}
@@ -100,13 +99,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) {
- gpr_timespec now;
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec now, gpr_timespec deadline) {
int added_worker = 0;
- now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (gpr_time_cmp(now, deadline) > 0) {
- return 0 /* GPR_FALSE */;
- }
worker->next = worker->prev = NULL;
gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
@@ -127,15 +122,14 @@ done:
if (added_worker) {
remove_worker(pollset, worker);
}
- return 1 /* GPR_TRUE */;
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next;
- specific_worker != &p->root_worker;
- specific_worker = specific_worker->next) {
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
gpr_cv_signal(&specific_worker->cv);
}
p->kicked_without_pollers = 1;
diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h
index 8f1d7a22bb..cc1bd428b0 100644
--- a/src/core/iomgr/resolve_address.h
+++ b/src/core/iomgr/resolve_address.h
@@ -66,4 +66,4 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses);
grpc_resolved_addresses *grpc_blocking_resolve_address(
const char *addr, const char *default_port);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H */
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index dbf884c769..ce6972b797 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -105,10 +105,7 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
s = getaddrinfo(host, port, &hints, &result);
if (s != 0) {
/* Retry if well-known service name is recognized */
- char *svc[][2] = {
- {"http", "80"},
- {"https", "443"}
- };
+ char *svc[][2] = {{"http", "80"}, {"https", "443"}};
int i;
for (i = 0; i < (int)(sizeof(svc) / sizeof(svc[0])); i++) {
if (strcmp(port, svc[i][0]) == 0) {
diff --git a/src/core/iomgr/sockaddr.h b/src/core/iomgr/sockaddr.h
index 7528db73b8..e41e1ec6b4 100644
--- a/src/core/iomgr/sockaddr.h
+++ b/src/core/iomgr/sockaddr.h
@@ -44,4 +44,4 @@
#include "src/core/iomgr/sockaddr_posix.h"
#endif
-#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_H */
diff --git a/src/core/iomgr/sockaddr_posix.h b/src/core/iomgr/sockaddr_posix.h
index 2a3d932f70..388abb3306 100644
--- a/src/core/iomgr/sockaddr_posix.h
+++ b/src/core/iomgr/sockaddr_posix.h
@@ -41,4 +41,4 @@
#include <netdb.h>
#include <unistd.h>
-#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_POSIX_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_POSIX_H */
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c
index 65ec1f94ac..efdc480365 100644
--- a/src/core/iomgr/sockaddr_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -206,7 +206,8 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) {
case AF_UNIX:
return 1;
default:
- gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", addr->sa_family);
+ gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port",
+ addr->sa_family);
return 0;
}
}
@@ -220,7 +221,8 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) {
((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
return 1;
default:
- gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", addr->sa_family);
+ gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port",
+ addr->sa_family);
return 0;
}
}
diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h
index 99f1ed54da..6f7a279900 100644
--- a/src/core/iomgr/sockaddr_utils.h
+++ b/src/core/iomgr/sockaddr_utils.h
@@ -86,4 +86,4 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
char *grpc_sockaddr_to_uri(const struct sockaddr *addr);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */
diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h
index be55db805a..fe2be99145 100644
--- a/src/core/iomgr/sockaddr_win32.h
+++ b/src/core/iomgr/sockaddr_win32.h
@@ -43,4 +43,4 @@
const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);
#endif
-#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_WIN32_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_WIN32_H */
diff --git a/src/core/iomgr/socket_utils_posix.h b/src/core/iomgr/socket_utils_posix.h
index d2a315b462..d330d1986e 100644
--- a/src/core/iomgr/socket_utils_posix.h
+++ b/src/core/iomgr/socket_utils_posix.h
@@ -110,4 +110,4 @@ extern int grpc_forbid_dualstack_sockets_for_testing;
int grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
int protocol, grpc_dualstack_mode *dsmode);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_UTILS_POSIX_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_UTILS_POSIX_H */
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index f6ddfff0ad..7d8421376b 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -106,4 +106,4 @@ void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
gpr_free(winsocket);
}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index 346fde8edd..ecf2530173 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -54,7 +54,7 @@ typedef struct grpc_winsocket_callback_info {
OVERLAPPED overlapped;
/* The callback information for the pending operation. May be empty if the
caller hasn't registered a callback yet. */
- void(*cb)(void *opaque, int success);
+ void (*cb)(void *opaque, int success);
void *opaque;
/* A boolean to describe if the IO Completion Port got a notification for
that operation. This will happen if the operation completed before the
@@ -118,4 +118,4 @@ void grpc_winsocket_orphan(grpc_winsocket *socket);
or by grpc_winsocket_orphan if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h
index 0fa08b52b0..8ad9b818e1 100644
--- a/src/core/iomgr/tcp_client.h
+++ b/src/core/iomgr/tcp_client.h
@@ -41,7 +41,7 @@
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
- NULL on failure).
+ NULL on failure).
interested_parties points to a set of pollsets that would be interested
in this connection being established (in order to continue their work) */
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 9572ce5980..66027f87a0 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -264,7 +264,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->write_closure.cb_arg = ac;
gpr_mu_lock(&ac->mu);
- grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ grpc_alarm_init(&ac->alarm,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 24fee0596f..360e6ebd8c 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -572,7 +572,8 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_pollset_add_fd(pollset, tcp->em_fd);
}
-static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) {
+static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep,
+ grpc_pollset_set *pollset_set) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
}
diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h
index d752feaeea..40b3ae2679 100644
--- a/src/core/iomgr/tcp_posix.h
+++ b/src/core/iomgr/tcp_posix.h
@@ -56,4 +56,4 @@ extern int grpc_tcp_trace;
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size,
const char *peer_string);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 0adbe9507c..d0478d3604 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -79,7 +79,8 @@ struct grpc_tcp_server {
/* active port count: how many ports are actually still listening */
int active_ports;
- /* number of iomgr callbacks that have been explicitly scheduled during shutdown */
+ /* number of iomgr callbacks that have been explicitly scheduled during
+ * shutdown */
int iomgr_callbacks_pending;
/* all listening ports */
@@ -292,7 +293,7 @@ static void on_accept(void *arg, int from_iocp) {
and act accordingly. */
transfered_bytes = 0;
wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
- &transfered_bytes, FALSE, &flags);
+ &transfered_bytes, FALSE, &flags);
if (!wsa_success) {
if (sp->shutting_down) {
/* During the shutdown case, we ARE expecting an error. So that's well,
@@ -309,16 +310,15 @@ static void on_accept(void *arg, int from_iocp) {
if (!sp->shutting_down) {
peer_name_string = NULL;
err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
- (char *)&sp->socket->socket,
- sizeof(sp->socket->socket));
+ (char *)&sp->socket->socket, sizeof(sp->socket->socket));
if (err) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
gpr_free(utf8_message);
}
- err = getpeername(sock, (struct sockaddr*)&peer_name, &peer_name_len);
+ err = getpeername(sock, (struct sockaddr *)&peer_name, &peer_name_len);
if (!err) {
- peer_name_string = grpc_sockaddr_to_uri((struct sockaddr*)&peer_name);
+ peer_name_string = grpc_sockaddr_to_uri((struct sockaddr *)&peer_name);
} else {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 89aa741470..123f46d71d 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -55,24 +55,22 @@ static int set_non_block(SOCKET sock) {
int status;
unsigned long param = 1;
DWORD ret;
- status = WSAIoctl(sock, FIONBIO, &param, sizeof(param), NULL, 0, &ret,
- NULL, NULL);
+ status =
+ WSAIoctl(sock, FIONBIO, &param, sizeof(param), NULL, 0, &ret, NULL, NULL);
return status == 0;
}
static int set_dualstack(SOCKET sock) {
int status;
unsigned long param = 0;
- status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
- (const char *) &param, sizeof(param));
+ status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char *)&param,
+ sizeof(param));
return status == 0;
}
int grpc_tcp_prepare_socket(SOCKET sock) {
- if (!set_non_block(sock))
- return 0;
- if (!set_dualstack(sock))
- return 0;
+ if (!set_non_block(sock)) return 0;
+ if (!set_dualstack(sock)) return 0;
return 1;
}
@@ -100,9 +98,7 @@ typedef struct grpc_tcp {
char *peer_string;
} grpc_tcp;
-static void tcp_ref(grpc_tcp *tcp) {
- gpr_ref(&tcp->refcount);
-}
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
@@ -116,7 +112,7 @@ static void tcp_unref(grpc_tcp *tcp) {
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(void *tcpp, int from_iocp) {
- grpc_tcp *tcp = (grpc_tcp *) tcpp;
+ grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
gpr_slice *slice = NULL;
@@ -175,9 +171,9 @@ static void on_read(void *tcpp, int from_iocp) {
cb(opaque, slice, nslices, status);
}
-static void win_notify_on_read(grpc_endpoint *ep,
- grpc_endpoint_read_cb cb, void *arg) {
- grpc_tcp *tcp = (grpc_tcp *) ep;
+static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+ void *arg) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info;
int status;
@@ -201,8 +197,8 @@ static void win_notify_on_read(grpc_endpoint *ep,
buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
/* First let's try a synchronous, non-blocking read. */
- status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
- NULL, NULL);
+ status =
+ WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError();
/* Did we get data immediately ? Yay. */
@@ -232,7 +228,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_write(void *tcpp, int from_iocp) {
- grpc_tcp *tcp = (grpc_tcp *) tcpp;
+ grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
@@ -286,7 +282,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
gpr_slice *slices, size_t nslices,
grpc_endpoint_write_cb cb,
void *arg) {
- grpc_tcp *tcp = (grpc_tcp *) ep;
+ grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info;
unsigned i;
@@ -309,7 +305,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
- buffers = (WSABUF *) gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count);
+ buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count);
allocated = buffers;
}
@@ -370,15 +366,15 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) {
grpc_tcp *tcp;
- (void) ps;
- tcp = (grpc_tcp *) ep;
+ (void)ps;
+ tcp = (grpc_tcp *)ep;
grpc_iocp_add_socket(tcp->socket);
}
static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) {
grpc_tcp *tcp;
- (void) pss;
- tcp = (grpc_tcp *) ep;
+ (void)pss;
+ tcp = (grpc_tcp *)ep;
grpc_iocp_add_socket(tcp->socket);
}
@@ -389,7 +385,7 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) {
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_endpoint *ep) {
- grpc_tcp *tcp = (grpc_tcp *) ep;
+ grpc_tcp *tcp = (grpc_tcp *)ep;
int extra_refs = 0;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
@@ -401,7 +397,7 @@ static void win_shutdown(grpc_endpoint *ep) {
}
static void win_destroy(grpc_endpoint *ep) {
- grpc_tcp *tcp = (grpc_tcp *) ep;
+ grpc_tcp *tcp = (grpc_tcp *)ep;
tcp_unref(tcp);
}
@@ -410,13 +406,12 @@ static char *win_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static grpc_endpoint_vtable vtable = {win_notify_on_read, win_write,
- win_add_to_pollset, win_add_to_pollset_set,
- win_shutdown, win_destroy,
- win_get_peer};
+static grpc_endpoint_vtable vtable = {
+ win_notify_on_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
+ win_shutdown, win_destroy, win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
- grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp));
+ grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->socket = socket;
@@ -427,4 +422,4 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
return &tcp->base;
}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/tcp_windows.h b/src/core/iomgr/tcp_windows.h
index 7e301db250..deb3e48293 100644
--- a/src/core/iomgr/tcp_windows.h
+++ b/src/core/iomgr/tcp_windows.h
@@ -54,4 +54,4 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string);
int grpc_tcp_prepare_socket(SOCKET sock);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_WINDOWS_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_WINDOWS_H */
diff --git a/src/core/iomgr/time_averaged_stats.h b/src/core/iomgr/time_averaged_stats.h
index 13894b2640..e6dec1b4cd 100644
--- a/src/core/iomgr/time_averaged_stats.h
+++ b/src/core/iomgr/time_averaged_stats.h
@@ -85,4 +85,4 @@ void grpc_time_averaged_stats_add_sample(grpc_time_averaged_stats *stats,
value. */
double grpc_time_averaged_stats_update_average(grpc_time_averaged_stats *stats);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_TIME_AVERAGED_STATS_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_TIME_AVERAGED_STATS_H */
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
new file mode 100644
index 0000000000..6429c38b28
--- /dev/null
+++ b/src/core/iomgr/udp_server.c
@@ -0,0 +1,440 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SOCKET
+
+#include "src/core/iomgr/udp_server.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/iomgr/socket_utils_posix.h"
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+
+#define INIT_PORT_CAP 2
+
+/* one listening port */
+typedef struct {
+ int fd;
+ grpc_fd *emfd;
+ grpc_udp_server *server;
+ union {
+ gpr_uint8 untyped[GRPC_MAX_SOCKADDR_SIZE];
+ struct sockaddr sockaddr;
+ struct sockaddr_un un;
+ } addr;
+ int addr_len;
+ grpc_iomgr_closure read_closure;
+ grpc_iomgr_closure destroyed_closure;
+ grpc_udp_server_read_cb read_cb;
+} server_port;
+
+static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
+ struct stat st;
+
+ if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
+ unlink(un->sun_path);
+ }
+}
+
+/* the overall server */
+struct grpc_udp_server {
+ grpc_udp_server_cb cb;
+ void *cb_arg;
+
+ gpr_mu mu;
+ gpr_cv cv;
+
+ /* active port count: how many ports are actually still listening */
+ size_t active_ports;
+ /* destroyed port count: how many ports are completely destroyed */
+ size_t destroyed_ports;
+
+ /* is this server shutting down? (boolean) */
+ int shutdown;
+
+ /* all listening ports */
+ server_port *ports;
+ size_t nports;
+ size_t port_capacity;
+
+ /* shutdown callback */
+ void (*shutdown_complete)(void *);
+ void *shutdown_complete_arg;
+
+ /* all pollsets interested in new connections */
+ grpc_pollset **pollsets;
+ /* number of pollsets in the pollsets array */
+ size_t pollset_count;
+};
+
+grpc_udp_server *grpc_udp_server_create(void) {
+ grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server));
+ gpr_mu_init(&s->mu);
+ gpr_cv_init(&s->cv);
+ s->active_ports = 0;
+ s->destroyed_ports = 0;
+ s->shutdown = 0;
+ s->cb = NULL;
+ s->cb_arg = NULL;
+ s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
+ s->nports = 0;
+ s->port_capacity = INIT_PORT_CAP;
+
+ return s;
+}
+
+static void finish_shutdown(grpc_udp_server *s) {
+ s->shutdown_complete(s->shutdown_complete_arg);
+
+ gpr_mu_destroy(&s->mu);
+ gpr_cv_destroy(&s->cv);
+
+ gpr_free(s->ports);
+ gpr_free(s);
+}
+
+static void destroyed_port(void *server, int success) {
+ grpc_udp_server *s = server;
+ gpr_mu_lock(&s->mu);
+ s->destroyed_ports++;
+ if (s->destroyed_ports == s->nports) {
+ gpr_mu_unlock(&s->mu);
+ finish_shutdown(s);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ }
+}
+
+static void dont_care_about_shutdown_completion(void *ignored) {}
+
+/* called when all listening endpoints have been shutdown, so no further
+ events will be received on them - at this point it's safe to destroy
+ things */
+static void deactivated_all_ports(grpc_udp_server *s) {
+ size_t i;
+
+ /* delete ALL the things */
+ gpr_mu_lock(&s->mu);
+
+ if (!s->shutdown) {
+ gpr_mu_unlock(&s->mu);
+ return;
+ }
+
+ if (s->nports) {
+ for (i = 0; i < s->nports; i++) {
+ server_port *sp = &s->ports[i];
+ if (sp->addr.sockaddr.sa_family == AF_UNIX) {
+ unlink_if_unix_domain_socket(&sp->addr.un);
+ }
+ sp->destroyed_closure.cb = destroyed_port;
+ sp->destroyed_closure.cb_arg = s;
+ grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown");
+ }
+ gpr_mu_unlock(&s->mu);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ finish_shutdown(s);
+ }
+}
+
+void grpc_udp_server_destroy(
+ grpc_udp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg),
+ void *shutdown_complete_arg) {
+ size_t i;
+ gpr_mu_lock(&s->mu);
+
+ GPR_ASSERT(!s->shutdown);
+ s->shutdown = 1;
+
+ s->shutdown_complete = shutdown_complete
+ ? shutdown_complete
+ : dont_care_about_shutdown_completion;
+ s->shutdown_complete_arg = shutdown_complete_arg;
+
+ /* shutdown all fd's */
+ if (s->active_ports) {
+ for (i = 0; i < s->nports; i++) {
+ grpc_fd_shutdown(s->ports[i].emfd);
+ }
+ gpr_mu_unlock(&s->mu);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ deactivated_all_ports(s);
+ }
+}
+
+/* Prepare a recently-created socket for listening. */
+static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) {
+ struct sockaddr_storage sockname_temp;
+ socklen_t sockname_len;
+ int get_local_ip;
+ int rc;
+
+ if (fd < 0) {
+ goto error;
+ }
+
+ get_local_ip = 1;
+ rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
+ sizeof(get_local_ip));
+ if (rc == 0 && addr->sa_family == AF_INET6) {
+#if !TARGET_OS_IPHONE
+ rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
+ sizeof(get_local_ip));
+#endif
+ }
+
+ if (bind(fd, addr, addr_len) < 0) {
+ char *addr_str;
+ grpc_sockaddr_to_string(&addr_str, addr, 0);
+ gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
+ gpr_free(addr_str);
+ goto error;
+ }
+
+ sockname_len = sizeof(sockname_temp);
+ if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) {
+ goto error;
+ }
+
+ return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+
+error:
+ if (fd >= 0) {
+ close(fd);
+ }
+ return -1;
+}
+
+/* event manager callback when reads are ready */
+static void on_read(void *arg, int success) {
+ server_port *sp = arg;
+
+ if (success == 0) {
+ gpr_mu_lock(&sp->server->mu);
+ if (0 == --sp->server->active_ports) {
+ gpr_mu_unlock(&sp->server->mu);
+ deactivated_all_ports(sp->server);
+ } else {
+ gpr_mu_unlock(&sp->server->mu);
+ }
+ return;
+ }
+
+ /* Tell the registered callback that data is available to read. */
+ GPR_ASSERT(sp->read_cb);
+ sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg);
+
+ /* Re-arm the notification event so we get another chance to read. */
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+}
+
+static int add_socket_to_server(grpc_udp_server *s, int fd,
+ const struct sockaddr *addr, int addr_len,
+ grpc_udp_server_read_cb read_cb) {
+ server_port *sp;
+ int port;
+ char *addr_str;
+ char *name;
+
+ port = prepare_socket(fd, addr, addr_len);
+ if (port >= 0) {
+ grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
+ gpr_mu_lock(&s->mu);
+ GPR_ASSERT(!s->cb && "must add ports before starting server");
+ /* append it to the list under a lock */
+ if (s->nports == s->port_capacity) {
+ s->port_capacity *= 2;
+ s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
+ }
+ sp = &s->ports[s->nports++];
+ sp->server = s;
+ sp->fd = fd;
+ sp->emfd = grpc_fd_create(fd, name);
+ memcpy(sp->addr.untyped, addr, addr_len);
+ sp->addr_len = addr_len;
+ sp->read_cb = read_cb;
+ GPR_ASSERT(sp->emfd);
+ gpr_mu_unlock(&s->mu);
+ }
+
+ return port;
+}
+
+int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len,
+ grpc_udp_server_read_cb read_cb) {
+ int allocated_port1 = -1;
+ int allocated_port2 = -1;
+ unsigned i;
+ int fd;
+ grpc_dualstack_mode dsmode;
+ struct sockaddr_in6 addr6_v4mapped;
+ struct sockaddr_in wild4;
+ struct sockaddr_in6 wild6;
+ struct sockaddr_in addr4_copy;
+ struct sockaddr *allocated_addr = NULL;
+ struct sockaddr_storage sockname_temp;
+ socklen_t sockname_len;
+ int port;
+
+ if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
+ unlink_if_unix_domain_socket(addr);
+ }
+
+ /* Check if this is a wildcard port, and if so, try to keep the port the same
+ as some previously created listener. */
+ if (grpc_sockaddr_get_port(addr) == 0) {
+ for (i = 0; i < s->nports; i++) {
+ sockname_len = sizeof(sockname_temp);
+ if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp,
+ &sockname_len)) {
+ port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ if (port > 0) {
+ allocated_addr = malloc(addr_len);
+ memcpy(allocated_addr, addr, addr_len);
+ grpc_sockaddr_set_port(allocated_addr, port);
+ addr = allocated_addr;
+ break;
+ }
+ }
+ }
+ }
+
+ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
+ addr = (const struct sockaddr *)&addr6_v4mapped;
+ addr_len = sizeof(addr6_v4mapped);
+ }
+
+ /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
+ if (grpc_sockaddr_is_wildcard(addr, &port)) {
+ grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
+
+ /* Try listening on IPv6 first. */
+ addr = (struct sockaddr *)&wild6;
+ addr_len = sizeof(wild6);
+ fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
+ allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
+ goto done;
+ }
+
+ /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
+ if (port == 0 && allocated_port1 > 0) {
+ grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1);
+ }
+ addr = (struct sockaddr *)&wild4;
+ addr_len = sizeof(wild4);
+ }
+
+ fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
+ if (fd < 0) {
+ gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
+ }
+ if (dsmode == GRPC_DSMODE_IPV4 &&
+ grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
+ addr = (struct sockaddr *)&addr4_copy;
+ addr_len = sizeof(addr4_copy);
+ }
+ allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+
+done:
+ gpr_free(allocated_addr);
+ return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
+}
+
+int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
+ return (index < s->nports) ? s->ports[index].fd : -1;
+}
+
+void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets,
+ size_t pollset_count,
+ grpc_udp_server_cb new_transport_cb, void *cb_arg) {
+ size_t i, j;
+ GPR_ASSERT(new_transport_cb);
+ gpr_mu_lock(&s->mu);
+ GPR_ASSERT(!s->cb);
+ GPR_ASSERT(s->active_ports == 0);
+ s->cb = new_transport_cb;
+ s->cb_arg = cb_arg;
+ s->pollsets = pollsets;
+ for (i = 0; i < s->nports; i++) {
+ for (j = 0; j < pollset_count; j++) {
+ grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
+ }
+ s->ports[i].read_closure.cb = on_read;
+ s->ports[i].read_closure.cb_arg = &s->ports[i];
+ grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure);
+ s->active_ports++;
+ }
+ gpr_mu_unlock(&s->mu);
+}
+
+/* TODO(rjshade): Add a test for this method. */
+void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len,
+ const struct sockaddr *peer_address) {
+ int rc;
+ rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address));
+ if (rc < 0) {
+ gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno));
+ }
+}
+
+#endif
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
new file mode 100644
index 0000000000..fcc4ba6e97
--- /dev/null
+++ b/src/core/iomgr/udp_server.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H
+#define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H
+
+#include "src/core/iomgr/endpoint.h"
+
+/* Forward decl of grpc_udp_server */
+typedef struct grpc_udp_server grpc_udp_server;
+
+/* New server callback: ep is the newly connected connection */
+typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep);
+
+/* Called when data is available to read from the socket. */
+typedef void (*grpc_udp_server_read_cb)(int fd,
+ grpc_udp_server_cb new_transport_cb,
+ void *cb_arg);
+
+/* Create a server, initially not bound to any ports */
+grpc_udp_server *grpc_udp_server_create(void);
+
+/* Start listening to bound ports */
+void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets,
+ size_t pollset_count, grpc_udp_server_cb cb,
+ void *cb_arg);
+
+int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
+
+/* Add a port to the server, returning port number on success, or negative
+ on failure.
+
+ The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
+ both IPv4 and IPv6 connections, but :: is the preferred style. This usually
+ creates one socket, but possibly two on systems which support IPv6,
+ but not dualstack sockets. */
+
+/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
+ all of the multiple socket port matching logic in one place */
+int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len,
+ grpc_udp_server_read_cb read_cb);
+
+void grpc_udp_server_destroy(grpc_udp_server *server,
+ void (*shutdown_done)(void *shutdown_done_arg),
+ void *shutdown_done_arg);
+
+/* Write the contents of buffer to the underlying UDP socket. */
+/*
+void grpc_udp_server_write(grpc_udp_server *s,
+ const char *buffer,
+ int buf_len,
+ const struct sockaddr* to);
+ */
+
+#endif /* GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H */
diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c
index 52912235f8..08fdc74f17 100644
--- a/src/core/iomgr/wakeup_fd_eventfd.c
+++ b/src/core/iomgr/wakeup_fd_eventfd.c
@@ -75,8 +75,7 @@ static int eventfd_check_availability(void) {
}
const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = {
- eventfd_create, eventfd_consume, eventfd_wakeup, eventfd_destroy,
- eventfd_check_availability
-};
+ eventfd_create, eventfd_consume, eventfd_wakeup, eventfd_destroy,
+ eventfd_check_availability};
#endif /* GPR_LINUX_EVENTFD */
diff --git a/src/core/iomgr/wakeup_fd_nospecial.c b/src/core/iomgr/wakeup_fd_nospecial.c
index c1038bf379..78d763c103 100644
--- a/src/core/iomgr/wakeup_fd_nospecial.c
+++ b/src/core/iomgr/wakeup_fd_nospecial.c
@@ -43,12 +43,9 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <stddef.h>
-static int check_availability_invalid(void) {
- return 0;
-}
+static int check_availability_invalid(void) { return 0; }
const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = {
- NULL, NULL, NULL, NULL, check_availability_invalid
-};
+ NULL, NULL, NULL, NULL, check_availability_invalid};
-#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */
+#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c
index 9fc4ee2388..bd643e8061 100644
--- a/src/core/iomgr/wakeup_fd_pipe.c
+++ b/src/core/iomgr/wakeup_fd_pipe.c
@@ -94,4 +94,4 @@ const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
pipe_init, pipe_consume, pipe_wakeup, pipe_destroy,
pipe_check_availability};
-#endif /* GPR_POSIX_WAKUP_FD */
+#endif /* GPR_POSIX_WAKUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_pipe.h b/src/core/iomgr/wakeup_fd_pipe.h
index aa8f977ddb..01a13a97c0 100644
--- a/src/core/iomgr/wakeup_fd_pipe.h
+++ b/src/core/iomgr/wakeup_fd_pipe.h
@@ -38,4 +38,4 @@
extern grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable;
-#endif /* GRPC_INTERNAL_CORE_IOMGR_WAKEUP_FD_PIPE_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_WAKEUP_FD_PIPE_H */
diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c
index e48f5223fa..d09fb78d12 100644
--- a/src/core/iomgr/wakeup_fd_posix.c
+++ b/src/core/iomgr/wakeup_fd_posix.c
@@ -53,9 +53,7 @@ void grpc_wakeup_fd_global_init_force_fallback(void) {
wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
}
-void grpc_wakeup_fd_global_destroy(void) {
- wakeup_fd_vtable = NULL;
-}
+void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; }
void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->init(fd_info);
@@ -73,4 +71,4 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->destroy(fd_info);
}
-#endif /* GPR_POSIX_WAKEUP_FD */
+#endif /* GPR_POSIX_WAKEUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h
index a4da4df51f..b6c086900d 100644
--- a/src/core/iomgr/wakeup_fd_posix.h
+++ b/src/core/iomgr/wakeup_fd_posix.h
@@ -96,4 +96,4 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info);
* wakeup_fd_nospecial.c if no such implementation exists. */
extern const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable;
-#endif /* GRPC_INTERNAL_CORE_IOMGR_WAKEUP_FD_POSIX_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_WAKEUP_FD_POSIX_H */
diff --git a/src/core/json/json.h b/src/core/json/json.h
index cac18ad885..573584bf6f 100644
--- a/src/core/json/json.h
+++ b/src/core/json/json.h
@@ -85,4 +85,4 @@ char* grpc_json_dump_to_string(grpc_json* json, int indent);
grpc_json* grpc_json_create(grpc_json_type type);
void grpc_json_destroy(grpc_json* json);
-#endif /* GRPC_INTERNAL_CORE_JSON_JSON_H */
+#endif /* GRPC_INTERNAL_CORE_JSON_JSON_H */
diff --git a/src/core/json/json_common.h b/src/core/json/json_common.h
index 84bf375916..481695b38b 100644
--- a/src/core/json/json_common.h
+++ b/src/core/json/json_common.h
@@ -46,4 +46,4 @@ typedef enum {
GRPC_JSON_TOP_LEVEL
} grpc_json_type;
-#endif /* GRPC_INTERNAL_CORE_JSON_JSON_COMMON_H */
+#endif /* GRPC_INTERNAL_CORE_JSON_JSON_COMMON_H */
diff --git a/src/core/json/json_reader.c b/src/core/json/json_reader.c
index c14094c290..c22d4edd47 100644
--- a/src/core/json/json_reader.c
+++ b/src/core/json/json_reader.c
@@ -42,27 +42,26 @@ static void json_reader_string_clear(grpc_json_reader* reader) {
}
static void json_reader_string_add_char(grpc_json_reader* reader,
- gpr_uint32 c) {
+ gpr_uint32 c) {
reader->vtable->string_add_char(reader->userdata, c);
}
static void json_reader_string_add_utf32(grpc_json_reader* reader,
- gpr_uint32 utf32) {
+ gpr_uint32 utf32) {
reader->vtable->string_add_utf32(reader->userdata, utf32);
}
-static gpr_uint32
- grpc_json_reader_read_char(grpc_json_reader* reader) {
+static gpr_uint32 grpc_json_reader_read_char(grpc_json_reader* reader) {
return reader->vtable->read_char(reader->userdata);
}
static void json_reader_container_begins(grpc_json_reader* reader,
- grpc_json_type type) {
+ grpc_json_type type) {
reader->vtable->container_begins(reader->userdata, type);
}
-static grpc_json_type
- grpc_json_reader_container_ends(grpc_json_reader* reader) {
+static grpc_json_type grpc_json_reader_container_ends(
+ grpc_json_reader* reader) {
return reader->vtable->container_ends(reader->userdata);
}
@@ -101,8 +100,9 @@ void grpc_json_reader_init(grpc_json_reader* reader,
}
int grpc_json_reader_is_complete(grpc_json_reader* reader) {
- return ((reader->depth == 0) && ((reader->state == GRPC_JSON_STATE_END) ||
- (reader->state == GRPC_JSON_STATE_VALUE_END)));
+ return ((reader->depth == 0) &&
+ ((reader->state == GRPC_JSON_STATE_END) ||
+ (reader->state == GRPC_JSON_STATE_VALUE_END)));
}
grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) {
@@ -143,7 +143,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) {
case GRPC_JSON_STATE_OBJECT_KEY_STRING:
case GRPC_JSON_STATE_VALUE_STRING:
if (c != ' ') return GRPC_JSON_PARSE_ERROR;
- if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR;
+ if (reader->unicode_high_surrogate != 0)
+ return GRPC_JSON_PARSE_ERROR;
json_reader_string_add_char(reader, c);
break;
@@ -169,7 +170,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) {
switch (reader->state) {
case GRPC_JSON_STATE_OBJECT_KEY_STRING:
case GRPC_JSON_STATE_VALUE_STRING:
- if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR;
+ if (reader->unicode_high_surrogate != 0)
+ return GRPC_JSON_PARSE_ERROR;
json_reader_string_add_char(reader, c);
break;
@@ -253,7 +255,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) {
/* This is the \\ case. */
case GRPC_JSON_STATE_STRING_ESCAPE:
- if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR;
+ if (reader->unicode_high_surrogate != 0)
+ return GRPC_JSON_PARSE_ERROR;
json_reader_string_add_char(reader, '\\');
if (reader->escaped_string_was_key) {
reader->state = GRPC_JSON_STATE_OBJECT_KEY_STRING;
@@ -276,7 +279,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) {
break;
case GRPC_JSON_STATE_OBJECT_KEY_STRING:
- if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR;
+ if (reader->unicode_high_surrogate != 0)
+ return GRPC_JSON_PARSE_ERROR;
if (c == '"') {
reader->state = GRPC_JSON_STATE_OBJECT_KEY_END;
json_reader_set_key(reader);
@@ -288,7 +292,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) {
break;
case GRPC_JSON_STATE_VALUE_STRING:
- if (reader->unicode_high_surrogate != 0) return GRPC_JSON_PARSE_ERROR;
+ if (reader->unicode_high_surrogate != 0)
+ return GRPC_JSON_PARSE_ERROR;
if (c == '"') {
reader->state = GRPC_JSON_STATE_VALUE_END;
json_reader_set_string(reader);
@@ -438,7 +443,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader* reader) {
if (reader->unicode_high_surrogate == 0)
return GRPC_JSON_PARSE_ERROR;
utf32 = 0x10000;
- utf32 += (gpr_uint32)((reader->unicode_high_surrogate - 0xd800) * 0x400);
+ utf32 += (gpr_uint32)(
+ (reader->unicode_high_surrogate - 0xd800) * 0x400);
utf32 += (gpr_uint32)(reader->unicode_char - 0xdc00);
json_reader_string_add_utf32(reader, utf32);
reader->unicode_high_surrogate = 0;
diff --git a/src/core/json/json_reader.h b/src/core/json/json_reader.h
index b1a5ace8fb..4d5487f790 100644
--- a/src/core/json/json_reader.h
+++ b/src/core/json/json_reader.h
@@ -157,4 +157,4 @@ void grpc_json_reader_init(grpc_json_reader* reader,
*/
int grpc_json_reader_is_complete(grpc_json_reader* reader);
-#endif /* GRPC_INTERNAL_CORE_JSON_JSON_READER_H */
+#endif /* GRPC_INTERNAL_CORE_JSON_JSON_READER_H */
diff --git a/src/core/json/json_string.c b/src/core/json/json_string.c
index 03c1099167..e6622ec461 100644
--- a/src/core/json/json_string.c
+++ b/src/core/json/json_string.c
@@ -73,7 +73,6 @@ typedef struct {
size_t allocated;
} json_writer_userdata;
-
/* This function checks if there's enough space left in the output buffer,
* and will enlarge it if necessary. We're only allocating chunks of 256
* bytes at a time (or multiples thereof).
@@ -97,8 +96,8 @@ static void json_writer_output_char(void* userdata, char c) {
state->free_space--;
}
-static void json_writer_output_string_with_len(void* userdata,
- const char* str, size_t len) {
+static void json_writer_output_string_with_len(void* userdata, const char* str,
+ size_t len) {
json_writer_userdata* state = userdata;
json_writer_output_check(userdata, len);
memcpy(state->output + state->string_len, str, len);
@@ -106,8 +105,7 @@ static void json_writer_output_string_with_len(void* userdata,
state->free_space -= len;
}
-static void json_writer_output_string(void* userdata,
- const char* str) {
+static void json_writer_output_string(void* userdata, const char* str) {
size_t len = strlen(str);
json_writer_output_string_with_len(userdata, str, len);
}
@@ -184,8 +182,7 @@ static gpr_uint32 json_reader_read_char(void* userdata) {
/* Helper function to create a new grpc_json object and link it into
* our tree-in-progress inside our opaque structure.
*/
-static grpc_json* json_create_and_link(void* userdata,
- grpc_json_type type) {
+static grpc_json* json_create_and_link(void* userdata, grpc_json_type type) {
json_reader_userdata* state = userdata;
grpc_json* json = grpc_json_create(type);
@@ -201,7 +198,7 @@ static grpc_json* json_create_and_link(void* userdata,
json->parent->child = json;
}
if (json->parent->type == GRPC_JSON_OBJECT) {
- json->key = (char*) state->key;
+ json->key = (char*)state->key;
}
}
if (!state->top) {
@@ -261,13 +258,13 @@ static void json_reader_set_key(void* userdata) {
static void json_reader_set_string(void* userdata) {
json_reader_userdata* state = userdata;
grpc_json* json = json_create_and_link(userdata, GRPC_JSON_STRING);
- json->value = (char*) state->string;
+ json->value = (char*)state->string;
}
static int json_reader_set_number(void* userdata) {
json_reader_userdata* state = userdata;
grpc_json* json = json_create_and_link(userdata, GRPC_JSON_NUMBER);
- json->value = (char*) state->string;
+ json->value = (char*)state->string;
return 1;
}
@@ -287,32 +284,25 @@ static void json_reader_set_null(void* userdata) {
}
static grpc_json_reader_vtable reader_vtable = {
- json_reader_string_clear,
- json_reader_string_add_char,
- json_reader_string_add_utf32,
- json_reader_read_char,
- json_reader_container_begins,
- json_reader_container_ends,
- json_reader_set_key,
- json_reader_set_string,
- json_reader_set_number,
- json_reader_set_true,
- json_reader_set_false,
- json_reader_set_null
-};
+ json_reader_string_clear, json_reader_string_add_char,
+ json_reader_string_add_utf32, json_reader_read_char,
+ json_reader_container_begins, json_reader_container_ends,
+ json_reader_set_key, json_reader_set_string,
+ json_reader_set_number, json_reader_set_true,
+ json_reader_set_false, json_reader_set_null};
/* And finally, let's define our public API. */
grpc_json* grpc_json_parse_string_with_len(char* input, size_t size) {
grpc_json_reader reader;
json_reader_userdata state;
- grpc_json *json = NULL;
+ grpc_json* json = NULL;
grpc_json_reader_status status;
if (!input) return NULL;
state.top = state.current_container = state.current_value = NULL;
state.string = state.key = NULL;
- state.string_ptr = state.input = (gpr_uint8*) input;
+ state.string_ptr = state.input = (gpr_uint8*)input;
state.remaining_input = size;
grpc_json_reader_init(&reader, &reader_vtable, &state);
@@ -333,8 +323,8 @@ grpc_json* grpc_json_parse_string(char* input) {
return grpc_json_parse_string_with_len(input, UNBOUND_JSON_STRING_LENGTH);
}
-static void json_dump_recursive(grpc_json_writer* writer,
- grpc_json* json, int in_object) {
+static void json_dump_recursive(grpc_json_writer* writer, grpc_json* json,
+ int in_object) {
while (json) {
if (in_object) grpc_json_writer_object_key(writer, json->key);
@@ -370,10 +360,8 @@ static void json_dump_recursive(grpc_json_writer* writer,
}
static grpc_json_writer_vtable writer_vtable = {
- json_writer_output_char,
- json_writer_output_string,
- json_writer_output_string_with_len
-};
+ json_writer_output_char, json_writer_output_string,
+ json_writer_output_string_with_len};
char* grpc_json_dump_to_string(grpc_json* json, int indent) {
grpc_json_writer writer;
diff --git a/src/core/json/json_writer.c b/src/core/json/json_writer.c
index bed9a9bfa5..ca9c835825 100644
--- a/src/core/json/json_writer.c
+++ b/src/core/json/json_writer.c
@@ -41,11 +41,13 @@ static void json_writer_output_char(grpc_json_writer* writer, char c) {
writer->vtable->output_char(writer->userdata, c);
}
-static void json_writer_output_string(grpc_json_writer* writer, const char* str) {
+static void json_writer_output_string(grpc_json_writer* writer,
+ const char* str) {
writer->vtable->output_string(writer->userdata, str);
}
-static void json_writer_output_string_with_len(grpc_json_writer* writer, const char* str, size_t len) {
+static void json_writer_output_string_with_len(grpc_json_writer* writer,
+ const char* str, size_t len) {
writer->vtable->output_string_with_len(writer->userdata, str, len);
}
@@ -58,8 +60,7 @@ void grpc_json_writer_init(grpc_json_writer* writer, int indent,
writer->userdata = userdata;
}
-static void json_writer_output_indent(
- grpc_json_writer* writer) {
+static void json_writer_output_indent(grpc_json_writer* writer) {
static const char spacesstr[] =
" "
" "
@@ -99,14 +100,15 @@ static void json_writer_value_end(grpc_json_writer* writer) {
}
}
-static void json_writer_escape_utf16(grpc_json_writer* writer, gpr_uint16 utf16) {
+static void json_writer_escape_utf16(grpc_json_writer* writer,
+ gpr_uint16 utf16) {
static const char hex[] = "0123456789abcdef";
json_writer_output_string_with_len(writer, "\\u", 2);
json_writer_output_char(writer, hex[(utf16 >> 12) & 0x0f]);
json_writer_output_char(writer, hex[(utf16 >> 8) & 0x0f]);
json_writer_output_char(writer, hex[(utf16 >> 4) & 0x0f]);
- json_writer_output_char(writer, hex[(utf16) & 0x0f]);
+ json_writer_output_char(writer, hex[(utf16)&0x0f]);
}
static void json_writer_escape_string(grpc_json_writer* writer,
@@ -173,8 +175,8 @@ static void json_writer_escape_string(grpc_json_writer* writer,
* Any other range is technically reserved for future usage, so if we
* don't want the software to break in the future, we have to allow
* anything else. The first non-unicode character is 0x110000. */
- if (((utf32 >= 0xd800) && (utf32 <= 0xdfff)) ||
- (utf32 >= 0x110000)) break;
+ if (((utf32 >= 0xd800) && (utf32 <= 0xdfff)) || (utf32 >= 0x110000))
+ break;
if (utf32 >= 0x10000) {
/* If utf32 contains a character that is above 0xffff, it needs to be
* broken down into a utf-16 surrogate pair. A surrogate pair is first
@@ -194,7 +196,8 @@ static void json_writer_escape_string(grpc_json_writer* writer,
*/
utf32 -= 0x10000;
json_writer_escape_utf16(writer, (gpr_uint16)(0xd800 | (utf32 >> 10)));
- json_writer_escape_utf16(writer, (gpr_uint16)(0xdc00 | (utf32 & 0x3ff)));
+ json_writer_escape_utf16(writer,
+ (gpr_uint16)(0xdc00 | (utf32 & 0x3ff)));
} else {
json_writer_escape_utf16(writer, (gpr_uint16)utf32);
}
@@ -204,7 +207,8 @@ static void json_writer_escape_string(grpc_json_writer* writer,
json_writer_output_char(writer, '"');
}
-void grpc_json_writer_container_begins(grpc_json_writer* writer, grpc_json_type type) {
+void grpc_json_writer_container_begins(grpc_json_writer* writer,
+ grpc_json_type type) {
if (!writer->got_key) json_writer_value_end(writer);
json_writer_output_indent(writer);
json_writer_output_char(writer, type == GRPC_JSON_OBJECT ? '{' : '[');
@@ -213,7 +217,8 @@ void grpc_json_writer_container_begins(grpc_json_writer* writer, grpc_json_type
writer->depth++;
}
-void grpc_json_writer_container_ends(grpc_json_writer* writer, grpc_json_type type) {
+void grpc_json_writer_container_ends(grpc_json_writer* writer,
+ grpc_json_type type) {
if (writer->indent && !writer->container_empty)
json_writer_output_char(writer, '\n');
writer->depth--;
@@ -238,14 +243,16 @@ void grpc_json_writer_value_raw(grpc_json_writer* writer, const char* string) {
writer->got_key = 0;
}
-void grpc_json_writer_value_raw_with_len(grpc_json_writer* writer, const char* string, size_t len) {
+void grpc_json_writer_value_raw_with_len(grpc_json_writer* writer,
+ const char* string, size_t len) {
if (!writer->got_key) json_writer_value_end(writer);
json_writer_output_indent(writer);
json_writer_output_string_with_len(writer, string, len);
writer->got_key = 0;
}
-void grpc_json_writer_value_string(grpc_json_writer* writer, const char* string) {
+void grpc_json_writer_value_string(grpc_json_writer* writer,
+ const char* string) {
if (!writer->got_key) json_writer_value_end(writer);
json_writer_output_indent(writer);
json_writer_escape_string(writer, string);
diff --git a/src/core/json/json_writer.h b/src/core/json/json_writer.h
index dfa61a5fef..a299dfabf8 100644
--- a/src/core/json/json_writer.h
+++ b/src/core/json/json_writer.h
@@ -78,16 +78,20 @@ void grpc_json_writer_init(grpc_json_writer* writer, int indent,
grpc_json_writer_vtable* vtable, void* userdata);
/* Signals the beginning of a container. */
-void grpc_json_writer_container_begins(grpc_json_writer* writer, grpc_json_type type);
+void grpc_json_writer_container_begins(grpc_json_writer* writer,
+ grpc_json_type type);
/* Signals the end of a container. */
-void grpc_json_writer_container_ends(grpc_json_writer* writer, grpc_json_type type);
+void grpc_json_writer_container_ends(grpc_json_writer* writer,
+ grpc_json_type type);
/* Writes down an object key for the next value. */
void grpc_json_writer_object_key(grpc_json_writer* writer, const char* string);
/* Sets a raw value. Useful for numbers. */
void grpc_json_writer_value_raw(grpc_json_writer* writer, const char* string);
/* Sets a raw value with its length. Useful for values like true or false. */
-void grpc_json_writer_value_raw_with_len(grpc_json_writer* writer, const char* string, size_t len);
+void grpc_json_writer_value_raw_with_len(grpc_json_writer* writer,
+ const char* string, size_t len);
/* Sets a string value. It'll be escaped, and utf-8 validated. */
-void grpc_json_writer_value_string(grpc_json_writer* writer, const char* string);
+void grpc_json_writer_value_string(grpc_json_writer* writer,
+ const char* string);
-#endif /* GRPC_INTERNAL_CORE_JSON_JSON_WRITER_H */
+#endif /* GRPC_INTERNAL_CORE_JSON_JSON_WRITER_H */
diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h
index 036d02f187..92dbab9042 100644
--- a/src/core/profiling/timers.h
+++ b/src/core/profiling/timers.h
@@ -88,7 +88,7 @@ enum grpc_profiling_tags {
} while (0)
#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \
- do { \
+ do { \
} while (0)
#define GRPC_TIMER_BEGIN(tag, id) \
diff --git a/src/core/security/auth_filters.h b/src/core/security/auth_filters.h
index ff921690e0..c179b54bec 100644
--- a/src/core/security/auth_filters.h
+++ b/src/core/security/auth_filters.h
@@ -39,4 +39,4 @@
extern const grpc_channel_filter grpc_client_auth_filter;
extern const grpc_channel_filter grpc_server_auth_filter;
-#endif /* GRPC_INTERNAL_CORE_SECURITY_AUTH_FILTERS_H */
+#endif /* GRPC_INTERNAL_CORE_SECURITY_AUTH_FILTERS_H */
diff --git a/src/core/security/base64.h b/src/core/security/base64.h
index b9abc07b52..31ae982691 100644
--- a/src/core/security/base64.h
+++ b/src/core/security/base64.h
@@ -49,4 +49,4 @@ gpr_slice grpc_base64_decode(const char *b64, int url_safe);
gpr_slice grpc_base64_decode_with_len(const char *b64, size_t b64_len,
int url_safe);
-#endif /* GRPC_INTERNAL_CORE_SECURITY_BASE64_H */
+#endif /* GRPC_INTERNAL_CORE_SECURITY_BASE64_H */
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index 410852da52..8e63978b82 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -200,7 +200,7 @@ static void auth_start_transport_op(grpc_call_element *elem,
channel_data *chand = elem->channel_data;
grpc_linked_mdelem *l;
size_t i;
- grpc_client_security_context* sec_ctx = NULL;
+ grpc_client_security_context *sec_ctx = NULL;
if (calld->security_context_set == 0) {
calld->security_context_set = 1;
@@ -316,9 +316,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
(grpc_channel_security_connector *)GRPC_SECURITY_CONNECTOR_REF(
sc, "client_auth_filter");
chand->md_ctx = metadata_context;
- chand->authority_string = grpc_mdstr_from_string(chand->md_ctx, ":authority", 0);
+ chand->authority_string =
+ grpc_mdstr_from_string(chand->md_ctx, ":authority", 0);
chand->path_string = grpc_mdstr_from_string(chand->md_ctx, ":path", 0);
- chand->error_msg_key = grpc_mdstr_from_string(chand->md_ctx, "grpc-message", 0);
+ chand->error_msg_key =
+ grpc_mdstr_from_string(chand->md_ctx, "grpc-message", 0);
chand->status_key = grpc_mdstr_from_string(chand->md_ctx, "grpc-status", 0);
}
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 6421ce673d..8852cab3e7 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -793,16 +793,16 @@ void on_simulated_token_fetch_done(void *user_data, int success) {
(grpc_credentials_metadata_request *)user_data;
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
GPR_ASSERT(success);
- r->cb(r->user_data, c->md_store->entries,
- c->md_store->num_entries, GRPC_CREDENTIALS_OK);
+ r->cb(r->user_data, c->md_store->entries, c->md_store->num_entries,
+ GRPC_CREDENTIALS_OK);
grpc_credentials_metadata_request_destroy(r);
}
static void md_only_test_get_request_metadata(grpc_credentials *creds,
- grpc_pollset *pollset,
- const char *service_url,
- grpc_credentials_metadata_cb cb,
- void *user_data) {
+ grpc_pollset *pollset,
+ const char *service_url,
+ grpc_credentials_metadata_cb cb,
+ void *user_data) {
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds;
if (c->is_async) {
@@ -854,10 +854,10 @@ static int access_token_has_request_metadata_only(
}
static void access_token_get_request_metadata(grpc_credentials *creds,
- grpc_pollset *pollset,
- const char *service_url,
- grpc_credentials_metadata_cb cb,
- void *user_data) {
+ grpc_pollset *pollset,
+ const char *service_url,
+ grpc_credentials_metadata_cb cb,
+ void *user_data) {
grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds;
cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h
index 04736525dc..29cd1ac87f 100644
--- a/src/core/security/credentials.h
+++ b/src/core/security/credentials.h
@@ -192,8 +192,9 @@ void grpc_flush_cached_google_default_credentials(void);
/* Metadata-only credentials with the specified key and value where
asynchronicity can be simulated for testing. */
-grpc_credentials *grpc_md_only_test_credentials_create(
- const char *md_key, const char *md_value, int is_async);
+grpc_credentials *grpc_md_only_test_credentials_create(const char *md_key,
+ const char *md_value,
+ int is_async);
/* Private constructor for jwt credentials from an already parsed json key.
Takes ownership of the key. */
diff --git a/src/core/security/credentials_metadata.c b/src/core/security/credentials_metadata.c
index 22c786be56..b8a132f1ea 100644
--- a/src/core/security/credentials_metadata.c
+++ b/src/core/security/credentials_metadata.c
@@ -47,7 +47,8 @@ static void store_ensure_capacity(grpc_credentials_md_store *store) {
grpc_credentials_md_store *grpc_credentials_md_store_create(
size_t initial_capacity) {
- grpc_credentials_md_store *store = gpr_malloc(sizeof(grpc_credentials_md_store));
+ grpc_credentials_md_store *store =
+ gpr_malloc(sizeof(grpc_credentials_md_store));
memset(store, 0, sizeof(grpc_credentials_md_store));
if (initial_capacity > 0) {
store->entries = gpr_malloc(initial_capacity * sizeof(grpc_credentials_md));
@@ -98,4 +99,3 @@ void grpc_credentials_md_store_unref(grpc_credentials_md_store *store) {
gpr_free(store);
}
}
-
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index d1f228665f..3631de867a 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
grpc_pollset_worker worker;
- grpc_pollset_work(&detector.pollset, &worker,
+ grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
@@ -203,8 +203,8 @@ end:
/* Blend with default ssl credentials and add a global reference so that it
can be cached and re-served. */
grpc_credentials *ssl_creds = grpc_ssl_credentials_create(NULL, NULL);
- default_credentials = grpc_credentials_ref(grpc_composite_credentials_create(
- ssl_creds, result));
+ default_credentials = grpc_credentials_ref(
+ grpc_composite_credentials_create(ssl_creds, result));
GPR_ASSERT(default_credentials != NULL);
grpc_credentials_unref(ssl_creds);
grpc_credentials_unref(result);
diff --git a/src/core/security/json_token.h b/src/core/security/json_token.h
index 091dfefb6e..7e06864ff3 100644
--- a/src/core/security/json_token.h
+++ b/src/core/security/json_token.h
@@ -115,4 +115,4 @@ grpc_auth_refresh_token grpc_auth_refresh_token_create_from_json(
/* Destructs the object. */
void grpc_auth_refresh_token_destruct(grpc_auth_refresh_token *refresh_token);
-#endif /* GRPC_INTERNAL_CORE_SECURITY_JSON_TOKEN_H */
+#endif /* GRPC_INTERNAL_CORE_SECURITY_JSON_TOKEN_H */
diff --git a/src/core/security/jwt_verifier.h b/src/core/security/jwt_verifier.h
index 8077e24883..7a32debfcb 100644
--- a/src/core/security/jwt_verifier.h
+++ b/src/core/security/jwt_verifier.h
@@ -133,4 +133,3 @@ grpc_jwt_verifier_status grpc_jwt_claims_check(const grpc_jwt_claims *claims,
const char *audience);
#endif /* GRPC_INTERNAL_CORE_SECURITY_JWT_VERIFIER_H */
-
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 95fbf71f3d..81b3e33cb2 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -332,7 +332,7 @@ static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
}
static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep,
- grpc_pollset_set *pollset_set) {
+ grpc_pollset_set *pollset_set) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
}
diff --git a/src/core/security/secure_endpoint.h b/src/core/security/secure_endpoint.h
index 93c29b5111..c563bdd9c5 100644
--- a/src/core/security/secure_endpoint.h
+++ b/src/core/security/secure_endpoint.h
@@ -46,4 +46,4 @@ grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *to_wrap,
gpr_slice *leftover_slices, size_t leftover_nslices);
-#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_ENDPOINT_H */
+#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_ENDPOINT_H */
diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h
index 29025f5236..d9b802556d 100644
--- a/src/core/security/secure_transport_setup.h
+++ b/src/core/security/secure_transport_setup.h
@@ -50,4 +50,4 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
grpc_secure_transport_setup_done_cb cb,
void *user_data);
-#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H */
+#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H */
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index a354536dcd..ba9ac68c5f 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -575,6 +575,16 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
if (!check_request_metadata_creds(request_metadata_creds)) {
goto error;
}
+ if (config->pem_root_certs == NULL) {
+ pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs);
+ if (pem_root_certs == NULL || pem_root_certs_size == 0) {
+ gpr_log(GPR_ERROR, "Could not get default pem root certs.");
+ goto error;
+ }
+ } else {
+ pem_root_certs = config->pem_root_certs;
+ pem_root_certs_size = config->pem_root_certs_size;
+ }
c = gpr_malloc(sizeof(grpc_ssl_channel_security_connector));
memset(c, 0, sizeof(grpc_ssl_channel_security_connector));
@@ -590,16 +600,6 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
if (overridden_target_name != NULL) {
c->overridden_target_name = gpr_strdup(overridden_target_name);
}
- if (config->pem_root_certs == NULL) {
- pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs);
- if (pem_root_certs == NULL || pem_root_certs_size == 0) {
- gpr_log(GPR_ERROR, "Could not get default pem root certs.");
- goto error;
- }
- } else {
- pem_root_certs = config->pem_root_certs;
- pem_root_certs_size = config->pem_root_certs_size;
- }
result = tsi_create_ssl_client_handshaker_factory(
config->pem_private_key, config->pem_private_key_size,
config->pem_cert_chain, config->pem_cert_chain_size, pem_root_certs,
diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c
index 1ef0fc9255..c1b434f302 100644
--- a/src/core/security/security_context.c
+++ b/src/core/security/security_context.c
@@ -204,8 +204,7 @@ int grpc_auth_context_set_peer_identity_property_name(grpc_auth_context *ctx,
return 1;
}
-int grpc_auth_context_peer_is_authenticated(
- const grpc_auth_context *ctx) {
+int grpc_auth_context_peer_is_authenticated(const grpc_auth_context *ctx) {
return ctx->peer_identity_property_name == NULL ? 0 : 1;
}
@@ -326,4 +325,3 @@ grpc_auth_metadata_processor *grpc_find_auth_metadata_processor_in_args(
}
return NULL;
}
-
diff --git a/src/core/security/security_context.h b/src/core/security/security_context.h
index 7fcd438cf6..a9a0306410 100644
--- a/src/core/security/security_context.h
+++ b/src/core/security/security_context.h
@@ -112,5 +112,4 @@ grpc_auth_metadata_processor *grpc_auth_metadata_processor_from_arg(
grpc_auth_metadata_processor *grpc_find_auth_metadata_processor_in_args(
const grpc_channel_args *args);
-#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H */
-
+#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H */
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 98d7788c83..57729be32c 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -105,24 +105,34 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
return md;
}
-static void on_md_processing_done(void *user_data,
- const grpc_metadata *consumed_md,
- size_t num_consumed_md, int success) {
+static void on_md_processing_done(
+ void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (success) {
+ /* TODO(jboeuf): Implement support for response_md. */
+ if (response_md != NULL && num_response_md > 0) {
+ gpr_log(GPR_INFO,
+ "response_md in auth metadata processing not supported for now. "
+ "Ignoring...");
+ }
+
+ if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md,
elem);
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1);
} else {
- gpr_slice message = gpr_slice_from_copied_string(
- "Authentication metadata processing failed.");
+ gpr_slice message;
+ error_details = error_details != NULL
+ ? error_details
+ : "Authentication metadata processing failed.";
+ message = gpr_slice_from_copied_string(error_details);
grpc_sopb_reset(calld->recv_ops);
- grpc_transport_stream_op_add_close(&calld->transport_op,
- GRPC_STATUS_UNAUTHENTICATED, &message);
+ grpc_transport_stream_op_add_close(&calld->transport_op, status, &message);
grpc_call_next_op(elem, &calld->transport_op);
}
grpc_metadata_array_destroy(&calld->md);
@@ -212,8 +222,7 @@ static void init_call_elem(grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
-}
+static void destroy_call_elem(grpc_call_element *elem) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
diff --git a/src/core/statistics/census_interface.h b/src/core/statistics/census_interface.h
index eb4349c311..ac1ff24866 100644
--- a/src/core/statistics/census_interface.h
+++ b/src/core/statistics/census_interface.h
@@ -73,4 +73,4 @@ census_op_id census_tracing_start_op(void);
/* Ends tracing. Calling this function will invalidate the input op_id. */
void census_tracing_end_op(census_op_id op_id);
-#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_INTERFACE_H */
+#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_INTERFACE_H */
diff --git a/src/core/statistics/census_log.h b/src/core/statistics/census_log.h
index 06869b7a33..60b6d597df 100644
--- a/src/core/statistics/census_log.h
+++ b/src/core/statistics/census_log.h
@@ -88,4 +88,4 @@ size_t census_log_remaining_space(void);
out-of-space. */
int census_log_out_of_space_count(void);
-#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_LOG_H */
+#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_LOG_H */
diff --git a/src/core/statistics/census_rpc_stats.c b/src/core/statistics/census_rpc_stats.c
index 3e571b1143..b836987cf0 100644
--- a/src/core/statistics/census_rpc_stats.c
+++ b/src/core/statistics/census_rpc_stats.c
@@ -85,8 +85,8 @@ static void delete_key(void* key) { gpr_free(key); }
static const census_ht_option ht_opt = {
CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */,
- simple_hash /* hash function */, cmp_str_keys /* key comparator */,
- delete_stats /* data deleter */, delete_key /* key deleter */
+ simple_hash /* hash function */, cmp_str_keys /* key comparator */,
+ delete_stats /* data deleter */, delete_key /* key deleter */
};
static void init_rpc_stats(void* stats) {
diff --git a/src/core/statistics/census_rpc_stats.h b/src/core/statistics/census_rpc_stats.h
index 9336dce1f8..aec31c1971 100644
--- a/src/core/statistics/census_rpc_stats.h
+++ b/src/core/statistics/census_rpc_stats.h
@@ -98,4 +98,4 @@ void census_stats_store_shutdown(void);
}
#endif
-#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_RPC_STATS_H */
+#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_RPC_STATS_H */
diff --git a/src/core/statistics/census_tracing.c b/src/core/statistics/census_tracing.c
index 3036ba5407..f2a09dc06e 100644
--- a/src/core/statistics/census_tracing.c
+++ b/src/core/statistics/census_tracing.c
@@ -60,8 +60,11 @@ static void delete_trace_obj(void* obj) {
}
static const census_ht_option ht_opt = {
- CENSUS_HT_UINT64 /* key type*/, 571 /* n_of_buckets */, NULL /* hash */,
- NULL /* compare_keys */, delete_trace_obj /* delete data */,
+ CENSUS_HT_UINT64 /* key type*/,
+ 571 /* n_of_buckets */,
+ NULL /* hash */,
+ NULL /* compare_keys */,
+ delete_trace_obj /* delete data */,
NULL /* delete key */
};
diff --git a/src/core/statistics/census_tracing.h b/src/core/statistics/census_tracing.h
index a4494b510c..08305c2469 100644
--- a/src/core/statistics/census_tracing.h
+++ b/src/core/statistics/census_tracing.h
@@ -93,4 +93,4 @@ census_trace_obj** census_get_active_ops(int* num_active_ops);
}
#endif
-#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_TRACING_H */
+#endif /* GRPC_INTERNAL_CORE_STATISTICS_CENSUS_TRACING_H */
diff --git a/src/core/statistics/hash_table.h b/src/core/statistics/hash_table.h
index 7bcb4bcd9b..b7f8e11af4 100644
--- a/src/core/statistics/hash_table.h
+++ b/src/core/statistics/hash_table.h
@@ -128,4 +128,4 @@ typedef void (*census_ht_itr_cb)(census_ht_key key, const void* val_ptr,
should not invalidate data entries. */
gpr_uint64 census_ht_for_all(const census_ht* ht, census_ht_itr_cb);
-#endif /* GRPC_INTERNAL_CORE_STATISTICS_HASH_TABLE_H */
+#endif /* GRPC_INTERNAL_CORE_STATISTICS_HASH_TABLE_H */
diff --git a/src/core/support/cpu_iphone.c b/src/core/support/cpu_iphone.c
index d412a6d7ee..82b49b47bc 100644
--- a/src/core/support/cpu_iphone.c
+++ b/src/core/support/cpu_iphone.c
@@ -36,9 +36,7 @@
#ifdef GPR_CPU_IPHONE
/* Probably 2 instead of 1, but see comment on gpr_cpu_current_cpu. */
-unsigned gpr_cpu_num_cores(void) {
- return 1;
-}
+unsigned gpr_cpu_num_cores(void) { return 1; }
/* Most code that's using this is using it to shard across work queues. So
unless profiling shows it's a problem or there appears a way to detect the
@@ -46,8 +44,6 @@ unsigned gpr_cpu_num_cores(void) {
Note that the interface in cpu.h lets gpr_cpu_num_cores return 0, but doing
it makes it impossible for gpr_cpu_current_cpu to satisfy its stated range,
and some code might be relying on it. */
-unsigned gpr_cpu_current_cpu(void) {
- return 0;
-}
+unsigned gpr_cpu_current_cpu(void) { return 0; }
#endif /* GPR_CPU_IPHONE */
diff --git a/src/core/support/cpu_linux.c b/src/core/support/cpu_linux.c
index 282d4daab1..7af6a8f009 100644
--- a/src/core/support/cpu_linux.c
+++ b/src/core/support/cpu_linux.c
@@ -33,7 +33,7 @@
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
-#endif /* _GNU_SOURCE */
+#endif /* _GNU_SOURCE */
#include <grpc/support/port_platform.h>
diff --git a/src/core/support/env.h b/src/core/support/env.h
index 4f2e394d14..24172d8673 100644
--- a/src/core/support/env.h
+++ b/src/core/support/env.h
@@ -57,4 +57,4 @@ void gpr_setenv(const char *name, const char *value);
}
#endif
-#endif /* GRPC_INTERNAL_CORE_SUPPORT_ENV_H */
+#endif /* GRPC_INTERNAL_CORE_SUPPORT_ENV_H */
diff --git a/src/core/support/file.h b/src/core/support/file.h
index 1dafe390e3..d8b7cea44f 100644
--- a/src/core/support/file.h
+++ b/src/core/support/file.h
@@ -60,4 +60,4 @@ FILE *gpr_tmpfile(const char *prefix, char **tmp_filename);
}
#endif
-#endif /* GRPC_INTERNAL_CORE_SUPPORT_FILE_H */
+#endif /* GRPC_INTERNAL_CORE_SUPPORT_FILE_H */
diff --git a/src/core/support/histogram.c b/src/core/support/histogram.c
index 9029703891..78dbf98684 100644
--- a/src/core/support/histogram.c
+++ b/src/core/support/histogram.c
@@ -191,15 +191,18 @@ static double threshold_for_count_below(gpr_histogram *h, double count_below) {
break;
}
}
- return (bucket_start(h, (double)lower_idx) + bucket_start(h, (double)upper_idx)) / 2.0;
+ return (bucket_start(h, (double)lower_idx) +
+ bucket_start(h, (double)upper_idx)) /
+ 2.0;
} else {
/* treat values as uniform throughout the bucket, and find where this value
should lie */
lower_bound = bucket_start(h, (double)lower_idx);
upper_bound = bucket_start(h, (double)(lower_idx + 1));
- return GPR_CLAMP(upper_bound - (upper_bound - lower_bound) *
- (count_so_far - count_below) /
- h->buckets[lower_idx],
+ return GPR_CLAMP(upper_bound -
+ (upper_bound - lower_bound) *
+ (count_so_far - count_below) /
+ h->buckets[lower_idx],
h->min_seen, h->max_seen);
}
}
diff --git a/src/core/support/log_linux.c b/src/core/support/log_linux.c
index 5ac36e7b95..02f64d8b7e 100644
--- a/src/core/support/log_linux.c
+++ b/src/core/support/log_linux.c
@@ -93,8 +93,8 @@ void gpr_default_log(gpr_log_func_args *args) {
}
gpr_asprintf(&prefix, "%s%s.%09d %7tu %s:%d]",
- gpr_log_severity_string(args->severity), time_buffer,
- (int)(now.tv_nsec), gettid(), display_file, args->line);
+ gpr_log_severity_string(args->severity), time_buffer,
+ (int)(now.tv_nsec), gettid(), display_file, args->line);
fprintf(stderr, "%-60s %s\n", prefix, args->message);
gpr_free(prefix);
diff --git a/src/core/support/murmur_hash.h b/src/core/support/murmur_hash.h
index 85ab2fe4bf..343fcb99f7 100644
--- a/src/core/support/murmur_hash.h
+++ b/src/core/support/murmur_hash.h
@@ -41,4 +41,4 @@
/* compute the hash of key (length len) */
gpr_uint32 gpr_murmur_hash3(const void *key, size_t len, gpr_uint32 seed);
-#endif /* GRPC_INTERNAL_CORE_SUPPORT_MURMUR_HASH_H */
+#endif /* GRPC_INTERNAL_CORE_SUPPORT_MURMUR_HASH_H */
diff --git a/src/core/support/slice.c b/src/core/support/slice.c
index e4196a48c6..53024e88f1 100644
--- a/src/core/support/slice.c
+++ b/src/core/support/slice.c
@@ -284,7 +284,8 @@ gpr_slice gpr_slice_split_head(gpr_slice *source, size_t split) {
head.refcount = NULL;
head.data.inlined.length = (gpr_uint8)split;
memcpy(head.data.inlined.bytes, source->data.inlined.bytes, split);
- source->data.inlined.length = (gpr_uint8)(source->data.inlined.length - split);
+ source->data.inlined.length =
+ (gpr_uint8)(source->data.inlined.length - split);
memmove(source->data.inlined.bytes, source->data.inlined.bytes + split,
source->data.inlined.length);
} else if (split < sizeof(head.data.inlined.bytes)) {
diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c
index 6e6c72a2bf..987d5cb9b5 100644
--- a/src/core/support/slice_buffer.c
+++ b/src/core/support/slice_buffer.c
@@ -116,7 +116,8 @@ void gpr_slice_buffer_add(gpr_slice_buffer *sb, gpr_slice s) {
GPR_SLICE_INLINED_SIZE) {
memcpy(back->data.inlined.bytes + back->data.inlined.length,
s.data.inlined.bytes, s.data.inlined.length);
- back->data.inlined.length = (gpr_uint8)(back->data.inlined.length + s.data.inlined.length);
+ back->data.inlined.length =
+ (gpr_uint8)(back->data.inlined.length + s.data.inlined.length);
} else {
size_t cp1 = GPR_SLICE_INLINED_SIZE - back->data.inlined.length;
memcpy(back->data.inlined.bytes + back->data.inlined.length,
diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c
index bc741f8c70..27ecf62280 100644
--- a/src/core/support/stack_lockfree.c
+++ b/src/core/support/stack_lockfree.c
@@ -67,7 +67,7 @@ typedef union lockfree_node {
#define ENTRY_ALIGNMENT_BITS 3 /* make sure that entries aligned to 8-bytes */
#define INVALID_ENTRY_INDEX \
((1 << 16) - 1) /* reserve this entry as invalid \
- */
+ */
struct gpr_stack_lockfree {
lockfree_node *entries;
@@ -75,7 +75,7 @@ struct gpr_stack_lockfree {
#ifndef NDEBUG
/* Bitmap of pushed entries to check for double-push or pop */
- gpr_atm pushed[(INVALID_ENTRY_INDEX+1)/(8*sizeof(gpr_atm))];
+ gpr_atm pushed[(INVALID_ENTRY_INDEX + 1) / (8 * sizeof(gpr_atm))];
#endif
};
@@ -123,13 +123,13 @@ int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) {
#ifndef NDEBUG
/* Check for double push */
{
- int pushed_index = entry / (8*sizeof(gpr_atm));
- int pushed_bit = entry % (8*sizeof(gpr_atm));
+ int pushed_index = entry / (8 * sizeof(gpr_atm));
+ int pushed_bit = entry % (8 * sizeof(gpr_atm));
gpr_atm old_val;
old_val = gpr_atm_no_barrier_fetch_add(&stack->pushed[pushed_index],
- (gpr_atm)(1UL << pushed_bit));
- GPR_ASSERT((old_val & (1UL<<pushed_bit)) == 0);
+ (gpr_atm)(1UL << pushed_bit));
+ GPR_ASSERT((old_val & (1UL << pushed_bit)) == 0);
}
#endif
@@ -161,13 +161,13 @@ int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) {
#ifndef NDEBUG
/* Check for valid pop */
{
- int pushed_index = head.contents.index / (8*sizeof(gpr_atm));
- int pushed_bit = head.contents.index % (8*sizeof(gpr_atm));
+ int pushed_index = head.contents.index / (8 * sizeof(gpr_atm));
+ int pushed_bit = head.contents.index % (8 * sizeof(gpr_atm));
gpr_atm old_val;
old_val = gpr_atm_no_barrier_fetch_add(&stack->pushed[pushed_index],
- -(gpr_atm)(1UL << pushed_bit));
- GPR_ASSERT((old_val & (1UL<<pushed_bit)) != 0);
+ -(gpr_atm)(1UL << pushed_bit));
+ GPR_ASSERT((old_val & (1UL << pushed_bit)) != 0);
}
#endif
diff --git a/src/core/support/string.c b/src/core/support/string.c
index 9babbd910a..af0389ea83 100644
--- a/src/core/support/string.c
+++ b/src/core/support/string.c
@@ -125,7 +125,6 @@ char *gpr_dump_slice(gpr_slice s, gpr_uint32 flags) {
flags);
}
-
int gpr_parse_bytes_to_uint32(const char *buf, size_t len, gpr_uint32 *result) {
gpr_uint32 out = 0;
gpr_uint32 new;
@@ -187,9 +186,9 @@ char *gpr_strjoin_sep(const char **strs, size_t nstrs, const char *sep,
for (i = 0; i < nstrs; i++) {
out_length += strlen(strs[i]);
}
- out_length += 1; /* null terminator */
+ out_length += 1; /* null terminator */
if (nstrs > 0) {
- out_length += sep_len * (nstrs - 1); /* separators */
+ out_length += sep_len * (nstrs - 1); /* separators */
}
out = gpr_malloc(out_length);
out_length = 0;
@@ -214,10 +213,8 @@ char *gpr_strjoin_sep(const char **strs, size_t nstrs, const char *sep,
* str.
*
* Returns 1 and updates \a begin and \a end. Returns 0 otherwise. */
-static int slice_find_separator_offset(const gpr_slice str,
- const char *sep,
- const size_t read_offset,
- size_t *begin,
+static int slice_find_separator_offset(const gpr_slice str, const char *sep,
+ const size_t read_offset, size_t *begin,
size_t *end) {
size_t i;
const gpr_uint8 *str_ptr = GPR_SLICE_START_PTR(str) + read_offset;
@@ -255,9 +252,7 @@ void gpr_slice_split(gpr_slice str, const char *sep, gpr_slice_buffer *dst) {
}
}
-void gpr_strvec_init(gpr_strvec *sv) {
- memset(sv, 0, sizeof(*sv));
-}
+void gpr_strvec_init(gpr_strvec *sv) { memset(sv, 0, sizeof(*sv)); }
void gpr_strvec_destroy(gpr_strvec *sv) {
size_t i;
@@ -270,11 +265,11 @@ void gpr_strvec_destroy(gpr_strvec *sv) {
void gpr_strvec_add(gpr_strvec *sv, char *str) {
if (sv->count == sv->capacity) {
sv->capacity = GPR_MAX(sv->capacity + 8, sv->capacity * 2);
- sv->strs = gpr_realloc(sv->strs, sizeof(char*) * sv->capacity);
+ sv->strs = gpr_realloc(sv->strs, sizeof(char *) * sv->capacity);
}
sv->strs[sv->count++] = str;
}
char *gpr_strvec_flatten(gpr_strvec *sv, size_t *final_length) {
- return gpr_strjoin((const char**)sv->strs, sv->count, final_length);
+ return gpr_strjoin((const char **)sv->strs, sv->count, final_length);
}
diff --git a/src/core/support/string.h b/src/core/support/string.h
index 3ac4abeef8..a28e00fd3e 100644
--- a/src/core/support/string.h
+++ b/src/core/support/string.h
@@ -47,7 +47,7 @@ extern "C" {
/* String utility functions */
/* Flags for gpr_dump function. */
-#define GPR_DUMP_HEX 0x00000001
+#define GPR_DUMP_HEX 0x00000001
#define GPR_DUMP_ASCII 0x00000002
/* Converts array buf, of length len, into a C string according to the flags.
@@ -108,4 +108,4 @@ char *gpr_strvec_flatten(gpr_strvec *strs, size_t *total_length);
}
#endif
-#endif /* GRPC_INTERNAL_CORE_SUPPORT_STRING_H */
+#endif /* GRPC_INTERNAL_CORE_SUPPORT_STRING_H */
diff --git a/src/core/support/string_win32.c b/src/core/support/string_win32.c
index 27b9f3637a..8ffb0a225e 100644
--- a/src/core/support/string_win32.c
+++ b/src/core/support/string_win32.c
@@ -99,13 +99,9 @@ LPSTR gpr_tchar_to_char(LPCTSTR input) {
return ret;
}
#else
-char *gpr_tchar_to_char(LPTSTR input) {
- return gpr_strdup(input);
-}
+char *gpr_tchar_to_char(LPTSTR input) { return gpr_strdup(input); }
-char *gpr_char_to_tchar(LPTSTR input) {
- return gpr_strdup(input);
-}
+char *gpr_char_to_tchar(LPTSTR input) { return gpr_strdup(input); }
#endif
#endif /* GPR_WIN32 */
diff --git a/src/core/support/string_win32.h b/src/core/support/string_win32.h
index 1260aa55c1..e3043656fb 100644
--- a/src/core/support/string_win32.h
+++ b/src/core/support/string_win32.h
@@ -42,6 +42,6 @@
LPTSTR gpr_char_to_tchar(LPCSTR input);
LPSTR gpr_tchar_to_char(LPCTSTR input);
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32 */
-#endif /* GRPC_INTERNAL_CORE_SUPPORT_STRING_WIN32_H */
+#endif /* GRPC_INTERNAL_CORE_SUPPORT_STRING_WIN32_H */
diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c
index 61572b9a8e..6f078cd4bb 100644
--- a/src/core/support/sync_posix.c
+++ b/src/core/support/sync_posix.c
@@ -63,7 +63,8 @@ void gpr_cv_destroy(gpr_cv *cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); }
int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
int err = 0;
- if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == 0) {
+ if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) ==
+ 0) {
err = pthread_cond_wait(cv, mu);
} else {
struct timespec abs_deadline_ts;
diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c
index 54f84a46ac..df23492171 100644
--- a/src/core/support/sync_win32.c
+++ b/src/core/support/sync_win32.c
@@ -83,7 +83,8 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
int timeout = 0;
DWORD timeout_max_ms;
mu->locked = 0;
- if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == 0) {
+ if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) ==
+ 0) {
SleepConditionVariableCS(cv, &mu->cs, INFINITE);
} else {
gpr_timespec now = gpr_now(abs_deadline.clock_type);
diff --git a/src/core/support/thd.c b/src/core/support/thd.c
index ec308f3119..32c0db5b66 100644
--- a/src/core/support/thd.c
+++ b/src/core/support/thd.c
@@ -37,9 +37,7 @@
#include <grpc/support/thd.h>
-enum {
- GPR_THD_JOINABLE = 1
-};
+enum { GPR_THD_JOINABLE = 1 };
gpr_thd_options gpr_thd_options_default(void) {
gpr_thd_options options;
diff --git a/src/core/support/thd_internal.h b/src/core/support/thd_internal.h
index 4683c37742..1508c4691f 100644
--- a/src/core/support/thd_internal.h
+++ b/src/core/support/thd_internal.h
@@ -36,4 +36,4 @@
/* Internal interfaces between modules within the gpr support library. */
-#endif /* GRPC_INTERNAL_CORE_SUPPORT_THD_INTERNAL_H */
+#endif /* GRPC_INTERNAL_CORE_SUPPORT_THD_INTERNAL_H */
diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c
index fa4eb50556..c36d94d044 100644
--- a/src/core/support/thd_posix.c
+++ b/src/core/support/thd_posix.c
@@ -69,9 +69,11 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
GPR_ASSERT(pthread_attr_init(&attr) == 0);
if (gpr_thd_options_is_detached(options)) {
- GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0);
+ GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==
+ 0);
} else {
- GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0);
+ GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
+ 0);
}
thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
@@ -82,12 +84,8 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
return thread_started;
}
-gpr_thd_id gpr_thd_currentid(void) {
- return (gpr_thd_id)pthread_self();
-}
+gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); }
-void gpr_thd_join(gpr_thd_id t) {
- pthread_join((pthread_t)t, NULL);
-}
+void gpr_thd_join(gpr_thd_id t) { pthread_join((pthread_t)t, NULL); }
#endif /* GPR_POSIX_SYNC */
diff --git a/src/core/support/thd_win32.c b/src/core/support/thd_win32.c
index 4fa3907444..a9db180c1b 100644
--- a/src/core/support/thd_win32.c
+++ b/src/core/support/thd_win32.c
@@ -105,9 +105,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
return handle != NULL;
}
-gpr_thd_id gpr_thd_currentid(void) {
- return (gpr_thd_id)g_thd_info;
-}
+gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; }
void gpr_thd_join(gpr_thd_id t) {
struct thd_info *info = (struct thd_info *)t;
diff --git a/src/core/support/time.c b/src/core/support/time.c
index b523ae01cc..929adac918 100644
--- a/src/core/support/time.c
+++ b/src/core/support/time.c
@@ -315,5 +315,6 @@ gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type clock_type) {
return gpr_time_add(gpr_now(clock_type), t);
}
- return gpr_time_add(gpr_now(clock_type), gpr_time_sub(t, gpr_now(t.clock_type)));
+ return gpr_time_add(gpr_now(clock_type),
+ gpr_time_sub(t, gpr_now(t.clock_type)));
}
diff --git a/src/core/support/tls_pthread.c b/src/core/support/tls_pthread.c
index f2e76a553f..2d28226fc4 100644
--- a/src/core/support/tls_pthread.c
+++ b/src/core/support/tls_pthread.c
@@ -38,7 +38,7 @@
#include <grpc/support/tls.h>
gpr_intptr gpr_tls_set(struct gpr_pthread_thread_local *tls, gpr_intptr value) {
- GPR_ASSERT(0 == pthread_setspecific(tls->key, (void*)value));
+ GPR_ASSERT(0 == pthread_setspecific(tls->key, (void *)value));
return value;
}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index f01958984f..2c3b22d24e 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -59,4 +59,4 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q);
-#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 6a1a6cbf30..33f277da46 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -39,6 +39,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
@@ -242,6 +243,9 @@ struct grpc_call {
/* Compression algorithm for the call */
grpc_compression_algorithm compression_algorithm;
+ /* Supported encodings (compression algorithms), a bitset */
+ gpr_uint32 encodings_accepted_by_peer;
+
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -272,7 +276,8 @@ struct grpc_call {
/** completion events - for completion queue use */
grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
- /** siblings: children of the same parent form a list, and this list is protected under
+ /** siblings: children of the same parent form a list, and this list is
+ protected under
parent->mu */
grpc_call *sibling_next;
grpc_call *sibling_prev;
@@ -394,7 +399,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
} else {
call->sibling_next = parent_call->first_child;
call->sibling_prev = parent_call->first_child->sibling_prev;
- call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call;
+ call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
+ call;
}
gpr_mu_unlock(&parent_call->mu);
@@ -532,6 +538,45 @@ grpc_compression_algorithm grpc_call_get_compression_algorithm(
return call->compression_algorithm;
}
+static void set_encodings_accepted_by_peer(
+ grpc_call *call, const gpr_slice accept_encoding_slice) {
+ size_t i;
+ grpc_compression_algorithm algorithm;
+ gpr_slice_buffer accept_encoding_parts;
+
+ gpr_slice_buffer_init(&accept_encoding_parts);
+ gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts);
+
+ /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
+ * zeroes the whole grpc_call */
+ /* Always support no compression */
+ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
+ for (i = 0; i < accept_encoding_parts.count; i++) {
+ const gpr_slice *accept_encoding_entry_slice =
+ &accept_encoding_parts.slices[i];
+ if (grpc_compression_algorithm_parse(
+ (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice),
+ GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) {
+ GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
+ } else {
+ char *accept_encoding_entry_str =
+ gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII);
+ gpr_log(GPR_ERROR,
+ "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
+ accept_encoding_entry_str);
+ gpr_free(accept_encoding_entry_str);
+ }
+ }
+}
+
+gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call) {
+ return call->encodings_accepted_by_peer;
+}
+
+gpr_uint32 grpc_call_get_message_flags(const grpc_call *call) {
+ return call->incoming_message_flags;
+}
+
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
@@ -1280,7 +1325,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
const char *description,
void *reserved) {
grpc_call_error r;
- (void) reserved;
+ (void)reserved;
lock(c);
r = cancel_with_status(c, status, description);
unlock(c);
@@ -1408,10 +1453,11 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
algorithm =
- ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ ((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
+ if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
+ &algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
assert(0);
}
@@ -1440,6 +1486,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key ==
grpc_channel_get_compression_algorithm_string(call->channel)) {
set_compression_algorithm(call, decode_compression(md));
+ } else if (key == grpc_channel_get_encodings_accepted_by_peer_string(
+ call->channel)) {
+ set_encodings_accepted_by_peer(call, md->value->slice);
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1524,7 +1573,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
const grpc_op *op;
grpc_ioreq *req;
void (*finish_func)(grpc_call *, int, void *) = finish_batch;
- GPR_ASSERT(!reserved);
+
+ if (reserved != NULL) return GRPC_CALL_ERROR;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
@@ -1539,12 +1589,13 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* rewrite batch ops into ioreq ops */
for (in = 0, out = 0; in < nops; in++) {
op = &ops[in];
+ if (op->reserved != NULL) return GRPC_CALL_ERROR;
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
@@ -1559,7 +1610,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_INVALID_MESSAGE;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
req->flags = op->flags;
@@ -1571,7 +1622,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_CLOSE;
req->flags = op->flags;
break;
@@ -1582,7 +1633,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_CLIENT;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
req->flags = op->flags;
req->data.send_metadata.count =
@@ -1590,7 +1641,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->data.send_metadata.metadata =
op->data.send_status_from_server.trailing_metadata;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_STATUS;
req->data.send_status.code = op->data.send_status_from_server.status;
req->data.send_status.details =
@@ -1600,7 +1651,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.send_status_from_server.status_details, 0)
: NULL;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1610,7 +1661,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
req->data.recv_metadata->count = 0;
@@ -1620,7 +1671,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
req->flags = op->flags;
@@ -1632,26 +1683,26 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS;
req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
req->data.recv_status_details.details =
op->data.recv_status_on_client.status_details;
req->data.recv_status_details.details_capacity =
op->data.recv_status_on_client.status_details_capacity;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
req->data.recv_metadata =
op->data.recv_status_on_client.trailing_metadata;
req->data.recv_metadata->count = 0;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
break;
@@ -1659,14 +1710,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS;
req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
break;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 75bdbce980..00638e43b5 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -38,6 +38,10 @@
#include "src/core/channel/context.h"
#include <grpc/grpc.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/* Primitive operation types - grpc_op's get rewritten into these */
typedef enum {
GRPC_IOREQ_RECV_INITIAL_METADATA,
@@ -162,4 +166,19 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem);
gpr_uint8 grpc_call_is_client(grpc_call *call);
+grpc_compression_algorithm grpc_call_get_compression_algorithm(
+ const grpc_call *call);
+
+gpr_uint32 grpc_call_get_message_flags(const grpc_call *call);
+
+/** Returns a bitset for the encodings (compression algorithms) supported by \a
+ * call's peer.
+ *
+ * To be indexed by grpc_compression_algorithm enum values. */
+gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call);
+
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c
index 7bf8cafc24..5a3ef1e5f4 100644
--- a/src/core/surface/call_log_batch.c
+++ b/src/core/surface/call_log_batch.c
@@ -41,7 +41,7 @@ int grpc_trace_batch = 0;
static void add_metadata(gpr_strvec *b, const grpc_metadata *md, size_t count) {
size_t i;
- for(i = 0; i < count; i++) {
+ for (i = 0; i < count; i++) {
gpr_strvec_add(b, gpr_strdup("\nkey="));
gpr_strvec_add(b, gpr_strdup(md[i].key));
@@ -113,8 +113,9 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
char *tmp;
size_t i;
gpr_log(file, line, severity,
- "grpc_call_start_batch(call=%p, ops=%p, nops=%d, tag=%p)", call, ops, nops, tag);
- for(i = 0; i < nops; i++) {
+ "grpc_call_start_batch(call=%p, ops=%p, nops=%d, tag=%p)", call, ops,
+ nops, tag);
+ for (i = 0; i < nops; i++) {
tmp = grpc_op_string(&ops[i]);
gpr_log(file, line, severity, "ops[%d]: %s", i, tmp);
gpr_free(tmp);
@@ -123,8 +124,7 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
void grpc_server_log_request_call(char *file, int line,
gpr_log_severity severity,
- grpc_server *server,
- grpc_call **call,
+ grpc_server *server, grpc_call **call,
grpc_call_details *details,
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
@@ -133,8 +133,9 @@ void grpc_server_log_request_call(char *file, int line,
gpr_log(file, line, severity,
"grpc_server_request_call(server=%p, call=%p, details=%p, "
"initial_metadata=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
- "tag=%p)", server, call, details, initial_metadata,
- cq_bound_to_call, cq_for_notification, tag);
+ "tag=%p)",
+ server, call, details, initial_metadata, cq_bound_to_call,
+ cq_for_notification, tag);
}
void grpc_server_log_shutdown(char *file, int line, gpr_log_severity severity,
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 308572c634..e50251566d 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -66,6 +66,7 @@ struct grpc_channel {
/** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_compression_algorithm_string;
+ grpc_mdstr *grpc_encodings_accepted_by_peer_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
@@ -104,7 +105,10 @@ grpc_channel *grpc_channel_create_from_filters(
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0);
channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
- channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message", 0);
+ channel->grpc_encodings_accepted_by_peer_string =
+ grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0);
+ channel->grpc_message_string =
+ grpc_mdstr_from_string(mdctx, "grpc-message", 0);
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(i, buf);
@@ -159,7 +163,7 @@ static grpc_call *grpc_channel_create_call_internal(
send_metadata[num_metadata++] = authority_mdelem;
}
- return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL,
+ return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL,
send_metadata, num_metadata, deadline);
}
@@ -175,10 +179,11 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0)),
- host ?
- grpc_mdelem_from_metadata_strings(
- channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string),
- grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL,
+ host ? grpc_mdelem_from_metadata_strings(
+ channel->metadata_context,
+ GRPC_MDSTR_REF(channel->authority_string),
+ grpc_mdstr_from_string(channel->metadata_context, host, 0))
+ : NULL,
deadline);
}
@@ -189,9 +194,12 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
rc->path = grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0));
- rc->authority = host ? grpc_mdelem_from_metadata_strings(
- channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string),
- grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL;
+ rc->authority =
+ host ? grpc_mdelem_from_metadata_strings(
+ channel->metadata_context,
+ GRPC_MDSTR_REF(channel->authority_string),
+ grpc_mdstr_from_string(channel->metadata_context, host, 0))
+ : NULL;
gpr_mu_lock(&channel->registered_call_mu);
rc->next = channel->registered_calls;
channel->registered_calls = rc;
@@ -206,8 +214,8 @@ grpc_call *grpc_channel_create_registered_call(
registered_call *rc = registered_call_handle;
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
- channel, parent_call, propagation_mask, completion_queue,
- GRPC_MDELEM_REF(rc->path),
+ channel, parent_call, propagation_mask, completion_queue,
+ GRPC_MDELEM_REF(rc->path),
rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}
@@ -230,6 +238,7 @@ static void destroy_channel(void *p, int ok) {
}
GRPC_MDSTR_UNREF(channel->grpc_status_string);
GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string);
+ GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string);
GRPC_MDSTR_UNREF(channel->grpc_message_string);
GRPC_MDSTR_UNREF(channel->path_string);
GRPC_MDSTR_UNREF(channel->authority_string);
@@ -290,6 +299,11 @@ grpc_mdstr *grpc_channel_get_compression_algorithm_string(
return channel->grpc_compression_algorithm_string;
}
+grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
+ grpc_channel *channel) {
+ return channel->grpc_encodings_accepted_by_peer_string;
+}
+
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
return GRPC_MDELEM_REF(channel->grpc_status_elem[i]);
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 9e0646efaa..f271616f60 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -56,6 +56,8 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_compression_algorithm_string(
grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
+ grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 1223706457..88a7c16598 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -77,9 +77,10 @@ typedef struct {
} state_watcher;
static void delete_state_watcher(state_watcher *w) {
- grpc_channel_element *client_channel_elem =
- grpc_channel_stack_last_element(grpc_channel_get_channel_stack(w->channel));
- grpc_client_channel_del_interested_party(client_channel_elem, grpc_cq_pollset(w->cq));
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ grpc_client_channel_del_interested_party(client_channel_elem,
+ grpc_cq_pollset(w->cq));
GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
gpr_mu_destroy(&w->mu);
gpr_free(w);
@@ -166,9 +167,9 @@ void grpc_channel_watch_connectivity_state(
w->tag = tag;
w->channel = channel;
- grpc_alarm_init(
- &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_alarm_init(&w->alarm,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
@@ -178,7 +179,8 @@ void grpc_channel_watch_connectivity_state(
grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
- grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq));
+ grpc_client_channel_add_interested_party(client_channel_elem,
+ grpc_cq_pollset(cq));
grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
&w->on_complete);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 378b3f71a1..b58115a93f 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -167,10 +167,12 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
- gpr_timespec deadline,
- void *reserved) {
+ gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker worker;
+ int first_loop = 1;
+ gpr_timespec now;
+
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -197,12 +199,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+ now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
+ first_loop = 0;
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
@@ -240,6 +245,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker worker;
+ gpr_timespec now;
+ int first_loop = 1;
+
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -272,8 +280,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
break;
}
if (!add_plucker(cc, tag, &worker)) {
- gpr_log(GPR_DEBUG,
- "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d",
+ gpr_log(GPR_DEBUG,
+ "Too many outstanding grpc_completion_queue_pluck calls: maximum "
+ "is %d",
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
@@ -281,13 +290,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
- if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+ now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
+ first_loop = 0;
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline);
del_plucker(cc, tag, &worker);
}
done:
diff --git a/src/core/surface/event_string.h b/src/core/surface/event_string.h
index e8a8f93518..07c474e3a0 100644
--- a/src/core/surface/event_string.h
+++ b/src/core/surface/event_string.h
@@ -39,4 +39,4 @@
/* Returns a string describing an event. Must be later freed with gpr_free() */
char *grpc_event_string(grpc_event *ev);
-#endif /* GRPC_INTERNAL_CORE_SURFACE_EVENT_STRING_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_EVENT_STRING_H */
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 442bc72f21..d9044549f2 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -33,8 +33,11 @@
#include <grpc/support/port_platform.h>
+#include <memory.h>
+
#include <grpc/census.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/time.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/resolver_registry.h"
@@ -49,6 +52,8 @@
#include "src/core/transport/chttp2_transport.h"
#include "src/core/transport/connectivity_state.h"
+#define MAX_PLUGINS 128
+
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static int g_initializations;
@@ -58,7 +63,23 @@ static void do_basic_init(void) {
g_initializations = 0;
}
+typedef struct grpc_plugin {
+ void (*init)();
+ void (*destroy)();
+} grpc_plugin;
+
+static grpc_plugin g_all_of_the_plugins[MAX_PLUGINS];
+static int g_number_of_plugins = 0;
+
+void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) {
+ GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS);
+ g_all_of_the_plugins[g_number_of_plugins].init = init;
+ g_all_of_the_plugins[g_number_of_plugins].destroy = destroy;
+ g_number_of_plugins++;
+}
+
void grpc_init(void) {
+ int i;
gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
@@ -87,11 +108,17 @@ void grpc_init(void) {
}
}
grpc_timers_global_init();
+ for (i = 0; i < g_number_of_plugins; i++) {
+ if (g_all_of_the_plugins[i].init != NULL) {
+ g_all_of_the_plugins[i].init();
+ }
+ }
}
gpr_mu_unlock(&g_init_mu);
}
void grpc_shutdown(void) {
+ int i;
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
grpc_iomgr_shutdown();
@@ -99,6 +126,11 @@ void grpc_shutdown(void) {
grpc_timers_global_destroy();
grpc_tracer_shutdown();
grpc_resolver_registry_shutdown();
+ for (i = 0; i < g_number_of_plugins; i++) {
+ if (g_all_of_the_plugins[i].destroy != NULL) {
+ g_all_of_the_plugins[i].destroy();
+ }
+ }
}
gpr_mu_unlock(&g_init_mu);
}
diff --git a/src/core/surface/init.h b/src/core/surface/init.h
index 416874020d..771c30f412 100644
--- a/src/core/surface/init.h
+++ b/src/core/surface/init.h
@@ -37,4 +37,4 @@
void grpc_security_pre_init(void);
int grpc_is_initialized(void);
-#endif /* GRPC_INTERNAL_CORE_SURFACE_INIT_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_INIT_H */
diff --git a/src/core/surface/init_unsecure.c b/src/core/surface/init_unsecure.c
index ddb70cef8e..630d564a7d 100644
--- a/src/core/surface/init_unsecure.c
+++ b/src/core/surface/init_unsecure.c
@@ -33,5 +33,4 @@
#include "src/core/surface/init.h"
-void grpc_security_pre_init(void) {
-}
+void grpc_security_pre_init(void) {}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index c4215a2cfb..80704cbf67 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -50,6 +50,8 @@ typedef struct {
typedef struct {
grpc_mdctx *mdctx;
grpc_channel *master;
+ grpc_status_code error_code;
+ const char *error_message;
} channel_data;
static void lame_start_transport_stream_op(grpc_call_element *elem,
@@ -64,11 +66,11 @@ static void lame_start_transport_stream_op(grpc_call_element *elem,
if (op->recv_ops != NULL) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
- gpr_ltoa(GRPC_STATUS_UNKNOWN, tmp);
+ gpr_ltoa(chand->error_code, tmp);
calld->status.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp);
calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message",
- "Rpc sent on a lame channel.");
+ chand->error_message);
calld->status.prev = calld->details.next = NULL;
calld->status.next = &calld->details;
calld->details.prev = &calld->status;
@@ -138,8 +140,21 @@ static const grpc_channel_filter lame_filter = {
"lame-client",
};
-grpc_channel *grpc_lame_client_channel_create(const char *target) {
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
+
+grpc_channel *grpc_lame_client_channel_create(const char *target,
+ grpc_status_code error_code,
+ const char *error_message) {
+ grpc_channel *channel;
+ grpc_channel_element *elem;
+ channel_data *chand;
static const grpc_channel_filter *filters[] = {&lame_filter};
- return grpc_channel_create_from_filters(target, filters, 1, NULL,
- grpc_mdctx_create(), 1);
+ channel = grpc_channel_create_from_filters(target, filters, 1, NULL,
+ grpc_mdctx_create(), 1);
+ elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
+ GPR_ASSERT(elem->filter == &lame_filter);
+ chand = (channel_data *)elem->channel_data;
+ chand->error_code = error_code;
+ chand->error_message = error_message;
+ return channel;
}
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index c3150250b8..5b03ba95a7 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -199,13 +199,17 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
if (grpc_find_security_connector_in_args(args) != NULL) {
gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
- return grpc_lame_client_channel_create(target);
+ return grpc_lame_client_channel_create(
+ target, GRPC_STATUS_INVALID_ARGUMENT,
+ "Security connector exists in channel args.");
}
if (grpc_credentials_create_security_connector(
creds, target, args, NULL, &connector, &new_args_from_connector) !=
GRPC_SECURITY_OK) {
- return grpc_lame_client_channel_create(target);
+ return grpc_lame_client_channel_create(
+ target, GRPC_STATUS_INVALID_ARGUMENT,
+ "Failed to create security connector.");
}
mdctx = grpc_mdctx_create();
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index f883275951..1c402418e8 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -712,7 +712,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand->server = NULL;
chand->channel = NULL;
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path", 0);
- chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority", 0);
+ chand->authority_key =
+ grpc_mdstr_from_string(metadata_context, ":authority", 0);
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
@@ -974,6 +975,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op(transport, &op);
}
+void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) {
+ (void) done_arg;
+ gpr_free(storage);
+}
+
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
@@ -985,6 +991,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
grpc_cq_begin_op(cq);
+ if (server->shutdown_published) {
+ grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL,
+ gpr_malloc(sizeof(grpc_cq_completion)));
+ gpr_mu_unlock(&server->mu_global);
+ return;
+ }
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
@@ -1134,6 +1146,7 @@ grpc_call_error grpc_server_request_call(
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
grpc_cq_begin_op(cq_for_notification);
+ details->reserved = NULL;
rc->type = BATCH_CALL;
rc->server = server;
rc->tag = tag;
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index 9237eb5a90..fc7ae820f5 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -38,7 +38,7 @@
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
const grpc_channel_filter *filters[] = {&grpc_compress_filter};
- (void) reserved;
+ (void)reserved;
return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
args);
}
diff --git a/src/core/surface/surface_trace.h b/src/core/surface/surface_trace.h
index 01302bb5d4..2b4728e2b4 100644
--- a/src/core/surface/surface_trace.h
+++ b/src/core/surface/surface_trace.h
@@ -40,10 +40,10 @@
extern int grpc_surface_trace;
#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
- if (grpc_surface_trace) { \
+ if (grpc_surface_trace) { \
char *_ev = grpc_event_string(event); \
gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
gpr_free(_ev); \
}
-#endif /* GRPC_INTERNAL_CORE_SURFACE_SURFACE_TRACE_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_SURFACE_TRACE_H */
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 40bf2ebd79..474c3d5ee6 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -92,10 +92,10 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
- p->is_frame_compressed = 0; /* GPR_FALSE */
+ p->is_frame_compressed = 0; /* GPR_FALSE */
break;
case 1:
- p->is_frame_compressed = 1; /* GPR_TRUE */
+ p->is_frame_compressed = 1; /* GPR_TRUE */
break;
default:
gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type);
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index d84960009b..dc5eb18e42 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -177,10 +177,9 @@ void grpc_chttp2_publish_reads(
"parsed", transport_parsing, stream_global, max_recv_bytes,
-(gpr_int64)stream_parsing->incoming_window_delta);
stream_global->incoming_window -= stream_parsing->incoming_window_delta;
- GPR_ASSERT(stream_global->max_recv_bytes >=
- stream_parsing->incoming_window_delta);
- stream_global->max_recv_bytes -=
- stream_parsing->incoming_window_delta;
+ GPR_ASSERT(stream_global->max_recv_bytes >=
+ stream_parsing->incoming_window_delta);
+ stream_global->max_recv_bytes -= stream_parsing->incoming_window_delta;
stream_parsing->incoming_window_delta = 0;
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 0f04169741..1ea697f71e 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -66,6 +66,8 @@ typedef struct {
size_t header_idx;
/* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */
gpr_uint8 last_was_header;
+ /* have we seen a regular (non-colon-prefixed) header yet? */
+ gpr_uint8 seen_regular_header;
/* output stream id */
gpr_uint32 stream_id;
gpr_slice_buffer *output;
@@ -361,6 +363,15 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
gpr_uint32 indices_key;
int should_add_elem;
+ GPR_ASSERT (GPR_SLICE_LENGTH(elem->key->slice) > 0);
+ if (GPR_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */
+ st->seen_regular_header = 1;
+ } else if (st->seen_regular_header != 0) { /* reserved header */
+ gpr_log(GPR_ERROR,
+ "Reserved header (colon-prefixed) happening after regular ones.");
+ abort();
+ }
+
inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems);
/* is this elem currently in the decoders table? */
@@ -566,6 +577,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
st.cur_frame_type = NONE;
st.last_was_header = 0;
+ st.seen_regular_header = 0;
st.stream_id = stream_id;
st.output = output;
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 9c3ad7a777..38c6052f9c 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -363,7 +363,7 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
}
int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
+ grpc_chttp2_stream *s) {
stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
diff --git a/src/core/transport/chttp2/stream_map.c b/src/core/transport/chttp2/stream_map.c
index 0ec2f27291..bd16153ed1 100644
--- a/src/core/transport/chttp2/stream_map.c
+++ b/src/core/transport/chttp2/stream_map.c
@@ -123,8 +123,7 @@ void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src,
dst->values = gpr_realloc(dst->values, dst->capacity * sizeof(void *));
}
memcpy(dst->keys + dst->count, src->keys, src->count * sizeof(gpr_uint32));
- memcpy(dst->values + dst->count, src->values,
- src->count * sizeof(void*));
+ memcpy(dst->values + dst->count, src->values, src->count * sizeof(void *));
dst->count += src->count;
dst->free += src->free;
src->count = 0;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index b55e81fdca..123061b3fc 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -112,13 +112,18 @@ int grpc_chttp2_unlocking_check_writes(
}
}
- if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) {
- stream_writing->announce_window = stream_global->unannounced_incoming_window;
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- incoming_window, stream_global->unannounced_incoming_window);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window);
- stream_global->incoming_window += stream_global->unannounced_incoming_window;
+ if (!stream_global->read_closed &&
+ stream_global->unannounced_incoming_window > 0) {
+ stream_writing->announce_window =
+ stream_global->unannounced_incoming_window;
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "write", transport_global, stream_global, incoming_window,
+ stream_global->unannounced_incoming_window);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "write", transport_global, stream_global, unannounced_incoming_window,
+ -(gpr_int64)stream_global->unannounced_incoming_window);
+ stream_global->incoming_window +=
+ stream_global->unannounced_incoming_window;
stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
@@ -179,18 +184,20 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
- if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
+ if (stream_writing->sopb.nops > 0 ||
+ stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
- stream_writing->id, &transport_writing->hpack_compressor,
+ stream_writing->id,
+ &transport_writing->hpack_compressor,
&transport_writing->outbuf);
stream_writing->sopb.nops = 0;
}
if (stream_writing->announce_window > 0) {
gpr_slice_buffer_add(
&transport_writing->outbuf,
- grpc_chttp2_window_update_create(
- stream_writing->id, stream_writing->announce_window));
+ grpc_chttp2_window_update_create(stream_writing->id,
+ stream_writing->announce_window));
stream_writing->announce_window = 0;
}
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index a9f91b64d5..1bbd210e46 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -116,7 +116,7 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global,
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset);
static void add_to_pollset_set_locked(grpc_chttp2_transport *t,
- grpc_pollset_set *pollset_set);
+ grpc_pollset_set *pollset_set);
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
@@ -368,11 +368,10 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->global.max_recv_bytes =
- s->parsing.incoming_window =
+ s->global.max_recv_bytes = s->parsing.incoming_window =
s->global.incoming_window =
- t->global.settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ t->global.settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
s->global.in_stream_map = 1;
@@ -580,7 +579,7 @@ static void maybe_start_some_streams(
stream_global->incoming_window =
transport_global->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- stream_global->max_recv_bytes =
+ stream_global->max_recv_bytes =
GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes);
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
@@ -590,7 +589,6 @@ static void maybe_start_some_streams(
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
-
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -648,12 +646,14 @@ static void perform_stream_op_locked(
stream_global->publish_sopb->nops = 0;
stream_global->publish_state = op->recv_state;
if (stream_global->max_recv_bytes < op->max_recv_bytes) {
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global,
- max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "op", transport_global, stream_global, max_recv_bytes,
+ op->max_recv_bytes - stream_global->max_recv_bytes);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"op", transport_global, stream_global, unannounced_incoming_window,
op->max_recv_bytes - stream_global->max_recv_bytes);
- stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes;
+ stream_global->unannounced_incoming_window +=
+ op->max_recv_bytes - stream_global->max_recv_bytes;
stream_global->max_recv_bytes = op->max_recv_bytes;
}
grpc_chttp2_incoming_metadata_live_op_buffer_end(
@@ -1175,7 +1175,7 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t,
}
static void add_to_pollset_set_locked(grpc_chttp2_transport *t,
- grpc_pollset_set *pollset_set) {
+ grpc_pollset_set *pollset_set) {
if (t->ep) {
grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
}
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 44d32b6cb2..f92e87e9dd 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -133,8 +133,8 @@ static void unlock(grpc_mdctx *ctx) {
case), since otherwise we can be stuck waiting for a garbage collection
that will never happen. */
if (ctx->refs == 0) {
- /* uncomment if you're having trouble diagnosing an mdelem leak to make
- things clearer (slows down destruction a lot, however) */
+/* uncomment if you're having trouble diagnosing an mdelem leak to make
+ things clearer (slows down destruction a lot, however) */
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gc_mdtab(ctx);
#endif
@@ -311,7 +311,8 @@ static void slice_unref(void *p) {
unlock(ctx);
}
-grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str, int canonicalize_key) {
+grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str,
+ int canonicalize_key) {
if (canonicalize_key) {
size_t len;
size_t i;
@@ -522,9 +523,9 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx,
grpc_mdelem *grpc_mdelem_from_strings(grpc_mdctx *ctx, const char *key,
const char *value) {
- return grpc_mdelem_from_metadata_strings(ctx,
- grpc_mdstr_from_string(ctx, key, 0),
- grpc_mdstr_from_string(ctx, value, 0));
+ return grpc_mdelem_from_metadata_strings(
+ ctx, grpc_mdstr_from_string(ctx, key, 0),
+ grpc_mdstr_from_string(ctx, value, 0));
}
grpc_mdelem *grpc_mdelem_from_slices(grpc_mdctx *ctx, gpr_slice key,
diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h
index 15ef9bb555..a7af49ba55 100644
--- a/src/core/transport/metadata.h
+++ b/src/core/transport/metadata.h
@@ -95,7 +95,8 @@ size_t grpc_mdctx_get_mdtab_free_test_only(grpc_mdctx *mdctx);
/* Constructors for grpc_mdstr instances; take a variety of data types that
clients may have handy */
-grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str, int perform_key_canonicalization);
+grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str,
+ int perform_key_canonicalization);
/* Unrefs the slice. */
grpc_mdstr *grpc_mdstr_from_slice(grpc_mdctx *ctx, gpr_slice slice);
grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *str,
@@ -179,4 +180,4 @@ void grpc_mdctx_unlock(grpc_mdctx *ctx);
#define GRPC_MDSTR_KV_HASH(k_hash, v_hash) (GPR_ROTL((k_hash), 2) ^ (v_hash))
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_METADATA_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_METADATA_H */
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 0a9669b0ab..038586d48e 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -203,8 +203,8 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) {
#endif /* NDEBUG */
void grpc_metadata_batch_init(grpc_metadata_batch *batch) {
- batch->list.head = batch->list.tail = batch->garbage.head = batch->garbage.tail =
- NULL;
+ batch->list.head = batch->list.tail = batch->garbage.head =
+ batch->garbage.tail = NULL;
batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
@@ -288,7 +288,7 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target,
}
void grpc_metadata_batch_move(grpc_metadata_batch *dst,
- grpc_metadata_batch *src) {
+ grpc_metadata_batch *src) {
*dst = *src;
memset(src, 0, sizeof(grpc_metadata_batch));
}
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index 9ce1ddb95e..29127c4269 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -121,7 +121,7 @@ static void store32_little_endian(gpr_uint32 value, unsigned char* buf) {
buf[3] = (unsigned char)(value >> 24) & 0xFF;
buf[2] = (unsigned char)(value >> 16) & 0xFF;
buf[1] = (unsigned char)(value >> 8) & 0xFF;
- buf[0] = (unsigned char)(value) & 0xFF;
+ buf[0] = (unsigned char)(value)&0xFF;
}
static void tsi_fake_frame_reset(tsi_fake_frame* frame, int needs_draining) {
@@ -370,7 +370,8 @@ static void fake_protector_destroy(tsi_frame_protector* self) {
static const tsi_frame_protector_vtable frame_protector_vtable = {
fake_protector_protect, fake_protector_protect_flush,
- fake_protector_unprotect, fake_protector_destroy, };
+ fake_protector_unprotect, fake_protector_destroy,
+};
/* --- tsi_handshaker methods implementation. ---*/
@@ -393,7 +394,8 @@ static tsi_result fake_handshaker_get_bytes_to_send_to_peer(
next_message_to_send = TSI_FAKE_HANDSHAKE_MESSAGE_MAX;
}
if (tsi_tracing_enabled) {
- gpr_log(GPR_INFO, "%s prepared %s.", impl->is_client ? "Client" : "Server",
+ gpr_log(GPR_INFO, "%s prepared %s.",
+ impl->is_client ? "Client" : "Server",
tsi_fake_handshake_message_to_string(impl->next_message_to_send));
}
impl->next_message_to_send = next_message_to_send;
@@ -493,7 +495,8 @@ static const tsi_handshaker_vtable handshaker_vtable = {
fake_handshaker_get_result,
fake_handshaker_extract_peer,
fake_handshaker_create_frame_protector,
- fake_handshaker_destroy, };
+ fake_handshaker_destroy,
+};
tsi_handshaker* tsi_create_fake_handshaker(int is_client) {
tsi_fake_handshaker* impl = calloc(1, sizeof(tsi_fake_handshaker));
diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h
index af9730b90e..1fa11349fb 100644
--- a/src/core/tsi/fake_transport_security.h
+++ b/src/core/tsi/fake_transport_security.h
@@ -58,4 +58,4 @@ tsi_frame_protector* tsi_create_fake_protector(
}
#endif
-#endif /* GRPC_INTERNAL_CORE_TSI_FAKE_TRANSPORT_SECURITY_H */
+#endif /* GRPC_INTERNAL_CORE_TSI_FAKE_TRANSPORT_SECURITY_H */
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 609fc06ed5..0b416f6c9d 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -43,7 +43,7 @@
#include "src/core/tsi/transport_security.h"
#include <openssl/bio.h>
-#include <openssl/crypto.h> /* For OPENSSL_free */
+#include <openssl/crypto.h> /* For OPENSSL_free */
#include <openssl/err.h>
#include <openssl/ssl.h>
#include <openssl/x509.h>
@@ -54,7 +54,6 @@
#define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_UPPER_BOUND 16384
#define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_LOWER_BOUND 1024
-
/* Putting a macro like this and littering the source file with #if is really
bad practice.
TODO(jboeuf): refactor all the #if / #endif in a separate module. */
@@ -116,7 +115,7 @@ typedef struct {
/* --- Library Initialization. ---*/
static gpr_once init_openssl_once = GPR_ONCE_INIT;
-static gpr_mu *openssl_mutexes = NULL;
+static gpr_mu* openssl_mutexes = NULL;
static void openssl_locking_cb(int mode, int type, const char* file, int line) {
if (mode & CRYPTO_LOCK) {
@@ -195,7 +194,7 @@ static void ssl_info_callback(const SSL* ssl, int where, int ret) {
/* Returns 1 if name looks like an IP address, 0 otherwise.
This is a very rough heuristic as it does not handle IPV6 or things like:
0300.0250.00.01, 0xC0.0Xa8.0x0.0x1, 000030052000001, 0xc0.052000001 */
-static int looks_like_ip_address(const char *name) {
+static int looks_like_ip_address(const char* name) {
size_t i;
size_t dot_count = 0;
size_t num_size = 0;
@@ -215,7 +214,6 @@ static int looks_like_ip_address(const char *name) {
return 1;
}
-
/* Gets the subject CN from an X509 cert. */
static tsi_result ssl_get_x509_common_name(X509* cert, unsigned char** utf8,
size_t* utf8_size) {
@@ -630,7 +628,8 @@ static tsi_result build_alpn_protocol_name_list(
}
/* Safety check. */
if ((current < *protocol_name_list) ||
- ((gpr_uintptr)(current - *protocol_name_list) != *protocol_name_list_length)) {
+ ((gpr_uintptr)(current - *protocol_name_list) !=
+ *protocol_name_list_length)) {
return TSI_INTERNAL_ERROR;
}
return TSI_OK;
@@ -768,7 +767,8 @@ static void ssl_protector_destroy(tsi_frame_protector* self) {
static const tsi_frame_protector_vtable frame_protector_vtable = {
ssl_protector_protect, ssl_protector_protect_flush, ssl_protector_unprotect,
- ssl_protector_destroy, };
+ ssl_protector_destroy,
+};
/* --- tsi_handshaker methods implementation. ---*/
@@ -948,7 +948,8 @@ static const tsi_handshaker_vtable handshaker_vtable = {
ssl_handshaker_get_result,
ssl_handshaker_extract_peer,
ssl_handshaker_create_frame_protector,
- ssl_handshaker_destroy, };
+ ssl_handshaker_destroy,
+};
/* --- tsi_ssl_handshaker_factory common methods. --- */
@@ -1075,9 +1076,11 @@ static void ssl_client_handshaker_factory_destroy(
free(impl);
}
-static int client_handshaker_factory_npn_callback(
- SSL* ssl, unsigned char** out, unsigned char* outlen,
- const unsigned char* in, unsigned int inlen, void* arg) {
+static int client_handshaker_factory_npn_callback(SSL* ssl, unsigned char** out,
+ unsigned char* outlen,
+ const unsigned char* in,
+ unsigned int inlen,
+ void* arg) {
tsi_ssl_client_handshaker_factory* factory =
(tsi_ssl_client_handshaker_factory*)arg;
return select_protocol_list((const unsigned char**)out, outlen,
@@ -1121,7 +1124,7 @@ static void ssl_server_handshaker_factory_destroy(
static int does_entry_match_name(const char* entry, size_t entry_length,
const char* name) {
- const char *dot;
+ const char* dot;
const char* name_subdomain = NULL;
size_t name_length = strlen(name);
size_t name_subdomain_length;
@@ -1153,7 +1156,7 @@ static int does_entry_match_name(const char* entry, size_t entry_length,
if (name_subdomain_length < 2) return 0;
name_subdomain++; /* Starts after the dot. */
name_subdomain_length--;
- entry += 2; /* Remove *. */
+ entry += 2; /* Remove *. */
entry_length -= 2;
dot = strchr(name_subdomain, '.');
if ((dot == NULL) || (dot == &name_subdomain[name_subdomain_length - 1])) {
@@ -1170,7 +1173,7 @@ static int does_entry_match_name(const char* entry, size_t entry_length,
static int ssl_server_handshaker_factory_servername_callback(SSL* ssl, int* ap,
void* arg) {
tsi_ssl_server_handshaker_factory* impl =
- (tsi_ssl_server_handshaker_factory*)arg;
+ (tsi_ssl_server_handshaker_factory*)arg;
size_t i = 0;
const char* servername = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name);
if (servername == NULL || strlen(servername) == 0) {
diff --git a/src/core/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h
index 4bf6c81b75..cdf4f294be 100644
--- a/src/core/tsi/ssl_transport_security.h
+++ b/src/core/tsi/ssl_transport_security.h
@@ -170,4 +170,4 @@ int tsi_ssl_peer_matches_name(const tsi_peer* peer, const char* name);
}
#endif
-#endif /* GRPC_INTERNAL_CORE_TSI_SSL_TRANSPORT_SECURITY_H */
+#endif /* GRPC_INTERNAL_CORE_TSI_SSL_TRANSPORT_SECURITY_H */
diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h
index 4cd0ec2cfb..34283f2f9c 100644
--- a/src/core/tsi/transport_security.h
+++ b/src/core/tsi/transport_security.h
@@ -108,4 +108,4 @@ char* tsi_strdup(const char* src); /* Sadly, no strdup in C89. */
}
#endif
-#endif /* GRPC_INTERNAL_CORE_TSI_TRANSPORT_SECURITY_H */
+#endif /* GRPC_INTERNAL_CORE_TSI_TRANSPORT_SECURITY_H */
diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h
index e27e6b9fc9..03a51683a2 100644
--- a/src/core/tsi/transport_security_interface.h
+++ b/src/core/tsi/transport_security_interface.h
@@ -341,4 +341,4 @@ void tsi_handshaker_destroy(tsi_handshaker* self);
}
#endif
-#endif /* GRPC_INTERNAL_CORE_TSI_TRANSPORT_SECURITY_INTERFACE_H */
+#endif /* GRPC_INTERNAL_CORE_TSI_TRANSPORT_SECURITY_INTERFACE_H */