aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Hongyu Chen <hongyu@google.com>2015-08-25 14:44:15 -0700
committerGravatar Hongyu Chen <hongyu@google.com>2015-08-25 14:44:15 -0700
commit011ea49592e71e1db3ef43a094aa9b452ff21e67 (patch)
tree70017b24ed2d3736552025680d4a0f138adf456f /src/core
parenta96ce800a8df5c62ffd264317836ecf3433c4344 (diff)
parent1b481b64be43bd4c7655b25422344a17b2f198d9 (diff)
Merge remote-tracking branch 'upstream/master' into timespec
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/grpc_filter.c (renamed from src/core/channel/census_filter.c)65
-rw-r--r--src/core/census/grpc_filter.h (renamed from src/core/channel/census_filter.h)0
-rw-r--r--src/core/channel/channel_args.c63
-rw-r--r--src/core/channel/channel_args.h20
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c59
-rw-r--r--src/core/iomgr/pollset.h7
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c2
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c2
-rw-r--r--src/core/iomgr/pollset_posix.c15
-rw-r--r--src/core/iomgr/pollset_posix.h6
-rw-r--r--src/core/iomgr/pollset_windows.c10
-rw-r--r--src/core/iomgr/tcp_windows.c1
-rw-r--r--src/core/iomgr/udp_server.c2
-rw-r--r--src/core/security/google_default_credentials.c2
-rw-r--r--src/core/security/security_connector.c20
-rw-r--r--src/core/security/server_auth_filter.c28
-rw-r--r--src/core/support/time_precise.h1
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/completion_queue.c16
-rw-r--r--src/core/surface/secure_channel_create.c4
-rw-r--r--src/core/surface/server.c5
-rw-r--r--src/core/transport/chttp2/stream_encoder.c12
22 files changed, 234 insertions, 110 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/census/grpc_filter.c
index d996c3475e..fbedb35661 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -31,11 +31,13 @@
*
*/
-#include "src/core/channel/census_filter.h"
+#include "src/core/census/grpc_filter.h"
#include <stdio.h>
#include <string.h>
+#include "include/grpc/census.h"
+#include "src/core/census/rpc_stat_id.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/channel/noop_filter.h"
#include "src/core/statistics/census_interface.h"
@@ -47,24 +49,19 @@
typedef struct call_data {
census_op_id op_id;
- census_rpc_stats stats;
+ census_context* ctxt;
gpr_timespec start_ts;
+ int error;
/* recv callback */
grpc_stream_op_buffer* recv_ops;
- void (*on_done_recv)(void* user_data, int success);
- void* recv_user_data;
+ grpc_iomgr_closure* on_done_recv;
} call_data;
typedef struct channel_data {
grpc_mdstr* path_str; /* pointer to meta data str with key == ":path" */
} channel_data;
-static void init_rpc_stats(census_rpc_stats* stats) {
- memset(stats, 0, sizeof(census_rpc_stats));
- stats->cnt = 1;
-}
-
static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
call_data* calld,
channel_data* chand) {
@@ -77,8 +74,7 @@ static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
if (m->md->key == chand->path_str) {
gpr_log(GPR_DEBUG, "%s",
(const char*)GPR_SLICE_START_PTR(m->md->value->slice));
- census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
- m->md->value->slice));
+ /* Add method tag here */
}
}
}
@@ -95,8 +91,6 @@ static void client_mutate_op(grpc_call_element* elem,
static void client_start_transport_op(grpc_call_element* elem,
grpc_transport_stream_op* op) {
- call_data* calld = elem->call_data;
- GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
client_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@@ -108,7 +102,7 @@ static void server_on_done_recv(void* ptr, int success) {
if (success) {
extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
}
- calld->on_done_recv(calld->recv_user_data, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
static void server_mutate_op(grpc_call_element* elem,
@@ -118,9 +112,7 @@ static void server_mutate_op(grpc_call_element* elem,
/* substitute our callback for the op callback */
calld->recv_ops = op->recv_ops;
calld->on_done_recv = op->on_done_recv;
- calld->recv_user_data = op->recv_user_data;
- op->on_done_recv = server_on_done_recv;
- op->recv_user_data = elem;
+ op->on_done_recv = calld->on_done_recv;
}
}
@@ -132,35 +124,19 @@ static void server_start_transport_op(grpc_call_element* elem,
grpc_call_next_op(elem, op);
}
-static void channel_op(grpc_channel_element* elem,
- grpc_channel_element* from_elem, grpc_channel_op* op) {
- switch (op->type) {
- case GRPC_TRANSPORT_CLOSED:
- /* TODO(hongyu): Annotate trace information for all calls of the channel
- */
- break;
- default:
- break;
- }
- grpc_channel_next_op(elem, op);
-}
-
static void client_init_call_elem(grpc_call_element* elem,
const void* server_transport_data,
grpc_transport_stream_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
- init_rpc_stats(&d->stats);
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
- d->op_id = census_tracing_start_op();
if (initial_op) client_mutate_op(elem, initial_op);
}
static void client_destroy_call_elem(grpc_call_element* elem) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
- census_record_rpc_client_stats(d->op_id, &d->stats);
- census_tracing_end_op(d->op_id);
+ /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
}
static void server_init_call_elem(grpc_call_element* elem,
@@ -168,29 +144,24 @@ static void server_init_call_elem(grpc_call_element* elem,
grpc_transport_stream_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
- init_rpc_stats(&d->stats);
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
- d->op_id = census_tracing_start_op();
+ /* TODO(hongyu): call census_tracing_start_op here. */
+ grpc_iomgr_closure_init(d->on_done_recv, server_on_done_recv, elem);
if (initial_op) server_mutate_op(elem, initial_op);
}
static void server_destroy_call_elem(grpc_call_element* elem) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
- d->stats.elapsed_time_ms = gpr_timespec_to_micros(
- gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), d->start_ts));
- census_record_rpc_server_stats(d->op_id, &d->stats);
- census_tracing_end_op(d->op_id);
+ /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
}
-static void init_channel_elem(grpc_channel_element* elem,
+static void init_channel_elem(grpc_channel_element* elem, grpc_channel* master,
const grpc_channel_args* args, grpc_mdctx* mdctx,
int is_first, int is_last) {
channel_data* chand = elem->channel_data;
GPR_ASSERT(chand != NULL);
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
- chand->path_str = grpc_mdstr_from_string(mdctx, ":path");
+ chand->path_str = grpc_mdstr_from_string(mdctx, ":path", 0);
}
static void destroy_channel_elem(grpc_channel_element* elem) {
@@ -203,22 +174,24 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
const grpc_channel_filter grpc_client_census_filter = {
client_start_transport_op,
- channel_op,
+ grpc_channel_next_op,
sizeof(call_data),
client_init_call_elem,
client_destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
"census-client"};
const grpc_channel_filter grpc_server_census_filter = {
server_start_transport_op,
- channel_op,
+ grpc_channel_next_op,
sizeof(call_data),
server_init_call_elem,
server_destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
"census-server"};
diff --git a/src/core/channel/census_filter.h b/src/core/census/grpc_filter.h
index 1453c05d28..1453c05d28 100644
--- a/src/core/channel/census_filter.h
+++ b/src/core/census/grpc_filter.h
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index c430b56fa2..54ee75af28 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include <string.h>
@@ -146,3 +147,65 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
+
+/** Returns 1 if the argument for compression algorithm's enabled states bitset
+ * was found in \a a, returning the arg's value in \a states. Otherwise, returns
+ * 0. */
+static int find_compression_algorithm_states_bitset(
+ const grpc_channel_args *a, int **states_arg) {
+ if (a != NULL) {
+ size_t i;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
+ *states_arg = &a->args[i].value.integer;
+ return 1; /* GPR_TRUE */
+ }
+ }
+ }
+ return 0; /* GPR_FALSE */
+}
+
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
+ grpc_channel_args **a,
+ grpc_compression_algorithm algorithm,
+ int state) {
+ int *states_arg;
+ grpc_channel_args *result = *a;
+ const int states_arg_found =
+ find_compression_algorithm_states_bitset(*a, &states_arg);
+
+ if (states_arg_found) {
+ if (state != 0) {
+ GPR_BITSET(states_arg, algorithm);
+ } else {
+ GPR_BITCLEAR(states_arg, algorithm);
+ }
+ } else {
+ /* create a new arg */
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ /* all enabled by default */
+ tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
+ if (state != 0) {
+ GPR_BITSET(&tmp.value.integer, algorithm);
+ } else {
+ GPR_BITCLEAR(&tmp.value.integer, algorithm);
+ }
+ result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
+ grpc_channel_args_destroy(*a);
+ *a = result;
+ }
+ return result;
+}
+
+int grpc_channel_args_compression_algorithm_get_states(
+ const grpc_channel_args *a) {
+ int *states_arg;
+ if (find_compression_algorithm_states_bitset(a, &states_arg)) {
+ return *states_arg;
+ } else {
+ return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
+ }
+}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index 7e6ddd3997..06a6012dee 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -67,4 +67,24 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm);
+/** Sets the support for the given compression algorithm. By default, all
+ * compression algorithms are enabled. It's an error to disable an algorithm set
+ * by grpc_channel_args_set_compression_algorithm.
+ *
+ * Returns an instance will the updated algorithm states. The \a a pointer is
+ * modified to point to the returned instance (which may be different from the
+ * input value of \a a). */
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
+ grpc_channel_args **a,
+ grpc_compression_algorithm algorithm,
+ int enabled);
+
+/** Returns the bitset representing the support state (true for enabled, false
+ * for disabled) for compression algorithms.
+ *
+ * The i-th bit of the returned bitset corresponds to the i-th entry in the
+ * grpc_compression_algorithm enum. */
+int grpc_channel_args_compression_algorithm_get_states(
+ const grpc_channel_args *a);
+
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 74584e7e2c..8419873908 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -60,9 +60,12 @@ typedef struct {
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels);
- /** the address that we've 'resolved' */
- struct sockaddr_storage addr;
- int addr_len;
+ /** the addresses that we've 'resolved' */
+ struct sockaddr_storage *addrs;
+ /** the corresponding length of the addresses */
+ int *addrs_len;
+ /** how many elements in \a addrs */
+ size_t num_addrs;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -119,17 +122,22 @@ static void sockaddr_next(grpc_resolver *resolver,
static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
- grpc_subchannel *subchannel;
+ grpc_subchannel **subchannels;
grpc_subchannel_args args;
if (r->next_completion != NULL && !r->published) {
+ size_t i;
cfg = grpc_client_config_create();
- memset(&args, 0, sizeof(args));
- args.addr = (struct sockaddr *)&r->addr;
- args.addr_len = r->addr_len;
- subchannel =
- grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args);
- lb_policy = r->lb_policy_factory(&subchannel, 1);
+ subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs);
+ for (i = 0; i < r->num_addrs; i++) {
+ memset(&args, 0, sizeof(args));
+ args.addr = (struct sockaddr *)&r->addrs[i];
+ args.addr_len = r->addrs_len[i];
+ subchannels[i] = grpc_subchannel_factory_create_subchannel(
+ r->subchannel_factory, &args);
+ }
+ lb_policy = r->lb_policy_factory(subchannels, r->num_addrs);
+ gpr_free(subchannels);
grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
@@ -143,6 +151,8 @@ static void sockaddr_destroy(grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
+ gpr_free(r->addrs);
+ gpr_free(r->addrs_len);
gpr_free(r);
}
@@ -238,13 +248,18 @@ done:
return result;
}
+static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_uri *uri,
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels),
grpc_subchannel_factory *subchannel_factory,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) {
+ size_t i;
+ int errors_found = 0; /* GPR_FALSE */
sockaddr_resolver *r;
+ gpr_slice path_slice;
+ gpr_slice_buffer path_parts;
if (0 != strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
@@ -253,7 +268,29 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
- if (!parse(uri, &r->addr, &r->addr_len)) {
+
+ path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing);
+ gpr_slice_buffer_init(&path_parts);
+
+ gpr_slice_split(path_slice, ",", &path_parts);
+ r->num_addrs = path_parts.count;
+ r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs);
+ r->addrs_len = gpr_malloc(sizeof(int) * r->num_addrs);
+
+ for(i = 0; i < r->num_addrs; i++) {
+ grpc_uri ith_uri = *uri;
+ char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
+ ith_uri.path = part_str;
+ if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) {
+ errors_found = 1; /* GPR_TRUE */
+ }
+ gpr_free(part_str);
+ if (errors_found) break;
+ }
+
+ gpr_slice_buffer_destroy(&path_parts);
+ gpr_slice_unref(path_slice);
+ if (errors_found) {
gpr_free(r);
return NULL;
}
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index c474e4dbf1..337596cb74 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -74,10 +74,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
not be released by grpc_pollset_work AFTER worker has been destroyed.
- Returns true if some work has been done, and false if the deadline
- expired. */
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline);
+ Tries not to block past deadline. */
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec now, gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 5ea9dd2101..fe66ebed77 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work(
pfds[1].events = POLLIN;
pfds[1].revents = 0;
- poll_rv = poll(pfds, 2, timeout_ms);
+ poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
if (poll_rv < 0) {
if (errno != EINTR) {
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 001fcecf76..30ee6e24db 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work(
POLLOUT, &watchers[i]);
}
- r = poll(pfds, pfd_count, timeout);
+ r = grpc_poll_function(pfds, pfd_count, timeout);
for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index a01f9ff727..6bd1b61f24 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -38,7 +38,6 @@
#include "src/core/iomgr/pollset_posix.h"
#include <errno.h>
-#include <poll.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@@ -57,6 +56,8 @@
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
+grpc_poll_function_type grpc_poll_function = poll;
+
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
@@ -89,6 +90,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ /* pollset->mu already held */
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next;
@@ -168,14 +170,10 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline) {
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
int added_worker = 0;
- if (gpr_time_cmp(now, deadline) > 0) {
- return 0;
- }
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
@@ -217,7 +215,6 @@ done:
gpr_mu_lock(&pollset->mu);
}
}
- return 1;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
@@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
- r = poll(pfd, nfds, timeout);
+ r = grpc_poll_function(pfd, nfds, timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index a3ea353de6..69bd9cca8c 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
+#include <poll.h>
+
#include <grpc/support/sync.h>
#include "src/core/iomgr/wakeup_fd_posix.h"
@@ -118,4 +120,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
* be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset);
+/* override to allow tests to hook poll() usage */
+typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
+extern grpc_poll_function_type grpc_poll_function;
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 8710395ab3..07522c8a0c 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -99,14 +99,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline) {
- gpr_timespec now;
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec now, gpr_timespec deadline) {
int added_worker = 0;
- now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (gpr_time_cmp(now, deadline) > 0) {
- return 0 /* GPR_FALSE */;
- }
worker->next = worker->prev = NULL;
gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
@@ -127,7 +122,6 @@ done:
if (added_worker) {
remove_worker(pollset, worker);
}
- return 1 /* GPR_TRUE */;
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 123f46d71d..901793ec43 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -152,6 +152,7 @@ static void on_read(void *tcpp, int from_iocp) {
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message);
}
+ gpr_slice_unref(tcp->read_slice);
status = GRPC_ENDPOINT_CB_ERROR;
} else {
if (info->bytes_transfered != 0) {
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index 16482c08f7..6429c38b28 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -235,8 +235,10 @@ static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) {
rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
sizeof(get_local_ip));
if (rc == 0 && addr->sa_family == AF_INET6) {
+#if !TARGET_OS_IPHONE
rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
sizeof(get_local_ip));
+#endif
}
if (bind(fd, addr, addr_len) < 0) {
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index d6092ece32..3631de867a 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
grpc_pollset_worker worker;
- grpc_pollset_work(&detector.pollset, &worker,
+ grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index a354536dcd..ba9ac68c5f 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -575,6 +575,16 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
if (!check_request_metadata_creds(request_metadata_creds)) {
goto error;
}
+ if (config->pem_root_certs == NULL) {
+ pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs);
+ if (pem_root_certs == NULL || pem_root_certs_size == 0) {
+ gpr_log(GPR_ERROR, "Could not get default pem root certs.");
+ goto error;
+ }
+ } else {
+ pem_root_certs = config->pem_root_certs;
+ pem_root_certs_size = config->pem_root_certs_size;
+ }
c = gpr_malloc(sizeof(grpc_ssl_channel_security_connector));
memset(c, 0, sizeof(grpc_ssl_channel_security_connector));
@@ -590,16 +600,6 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
if (overridden_target_name != NULL) {
c->overridden_target_name = gpr_strdup(overridden_target_name);
}
- if (config->pem_root_certs == NULL) {
- pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs);
- if (pem_root_certs == NULL || pem_root_certs_size == 0) {
- gpr_log(GPR_ERROR, "Could not get default pem root certs.");
- goto error;
- }
- } else {
- pem_root_certs = config->pem_root_certs;
- pem_root_certs_size = config->pem_root_certs_size;
- }
result = tsi_create_ssl_client_handshaker_factory(
config->pem_private_key, config->pem_private_key_size,
config->pem_cert_chain, config->pem_cert_chain_size, pem_root_certs,
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 2f42f01f53..6e831431fa 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -104,24 +104,34 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
return md;
}
-static void on_md_processing_done(void *user_data,
- const grpc_metadata *consumed_md,
- size_t num_consumed_md, int success) {
+static void on_md_processing_done(
+ void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (success) {
+ /* TODO(jboeuf): Implement support for response_md. */
+ if (response_md != NULL && num_response_md > 0) {
+ gpr_log(GPR_INFO,
+ "response_md in auth metadata processing not supported for now. "
+ "Ignoring...");
+ }
+
+ if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md,
elem);
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1);
} else {
- gpr_slice message = gpr_slice_from_copied_string(
- "Authentication metadata processing failed.");
+ gpr_slice message;
+ error_details = error_details != NULL
+ ? error_details
+ : "Authentication metadata processing failed.";
+ message = gpr_slice_from_copied_string(error_details);
grpc_sopb_reset(calld->recv_ops);
- grpc_transport_stream_op_add_close(&calld->transport_op,
- GRPC_STATUS_UNAUTHENTICATED, &message);
+ grpc_transport_stream_op_add_close(&calld->transport_op, status, &message);
grpc_call_next_op(elem, &calld->transport_op);
}
}
diff --git a/src/core/support/time_precise.h b/src/core/support/time_precise.h
index d40ad12d0c..574ebb8448 100644
--- a/src/core/support/time_precise.h
+++ b/src/core/support/time_precise.h
@@ -63,7 +63,6 @@ static void gpr_precise_clock_init() {
gpr_precise_clock end_cycle;
while (time(NULL) == start)
;
- gpr_precise_clock_now();
gpr_get_cycle_counter(&start_cycle);
while (time(NULL) == start + 1)
;
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 82ddfac757..707251da89 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
+#include "src/core/census/grpc_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
#include "src/core/channel/compress_filter.h"
@@ -165,10 +166,9 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_mdctx *mdctx = grpc_mdctx_create();
int n = 0;
GPR_ASSERT(!reserved);
- /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
- } */
+ }
filters[n++] = &grpc_compress_filter;
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 77443a7ae8..b58115a93f 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -170,6 +170,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker worker;
+ int first_loop = 1;
+ gpr_timespec now;
+
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -196,12 +199,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+ now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
+ first_loop = 0;
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
@@ -239,6 +245,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker worker;
+ gpr_timespec now;
+ int first_loop = 1;
+
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -281,13 +290,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
- if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+ now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
+ first_loop = 0;
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline);
del_plucker(cc, tag, &worker);
}
done:
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 5b03ba95a7..eccee24698 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
+#include "src/core/census/grpc_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
#include "src/core/channel/compress_filter.h"
@@ -217,10 +218,9 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
args_copy = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
- /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
- } */
+ }
filters[n++] = &grpc_compress_filter;
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 1c402418e8..292bf6fab8 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -41,7 +41,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-#include "src/core/channel/census_filter.h"
+#include "src/core/census/grpc_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/iomgr.h"
@@ -821,10 +821,9 @@ grpc_server *grpc_server_create_from_filters(
server->channel_filters =
gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
server->channel_filters[0] = &server_surface_filter;
- /* TODO(census): restore this once we rework census filter
if (census_enabled) {
server->channel_filters[1] = &grpc_server_census_filter;
- } */
+ }
for (i = 0; i < filter_count; i++) {
server->channel_filters[i + 1 + census_enabled] = filters[i];
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 0f04169741..1ea697f71e 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -66,6 +66,8 @@ typedef struct {
size_t header_idx;
/* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */
gpr_uint8 last_was_header;
+ /* have we seen a regular (non-colon-prefixed) header yet? */
+ gpr_uint8 seen_regular_header;
/* output stream id */
gpr_uint32 stream_id;
gpr_slice_buffer *output;
@@ -361,6 +363,15 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
gpr_uint32 indices_key;
int should_add_elem;
+ GPR_ASSERT (GPR_SLICE_LENGTH(elem->key->slice) > 0);
+ if (GPR_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */
+ st->seen_regular_header = 1;
+ } else if (st->seen_regular_header != 0) { /* reserved header */
+ gpr_log(GPR_ERROR,
+ "Reserved header (colon-prefixed) happening after regular ones.");
+ abort();
+ }
+
inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems);
/* is this elem currently in the decoders table? */
@@ -566,6 +577,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
st.cur_frame_type = NONE;
st.last_was_header = 0;
+ st.seen_regular_header = 0;
st.stream_id = stream_id;
st.output = output;