aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2018-04-11 15:26:56 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2018-04-11 15:26:56 -0700
commit7b011b296e252403cd47dc36a20cb4b8cdcaba5b (patch)
treeda0e08c9bb990fe00d9640739973ab4a67e31d60 /src/core/lib/iomgr
parentf2f5a9a0a75b2eba8d950345aeec89b4275f7b59 (diff)
parent71e8aee5e7861134fa8c5d49d41d569df981e88a (diff)
Merge branch 'master' into fix-time
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc23
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc6
-rw-r--r--src/core/lib/iomgr/socket_utils_linux.cc1
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.cc5
-rw-r--r--src/core/lib/iomgr/socket_utils_uv.cc4
-rw-r--r--src/core/lib/iomgr/tcp_custom.h2
-rw-r--r--src/core/lib/iomgr/tcp_server_custom.cc7
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.cc1
-rw-r--r--src/core/lib/iomgr/tcp_uv.cc41
-rw-r--r--src/core/lib/iomgr/timer_generic.cc10
10 files changed, 59 insertions, 41 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 0ef7c03056..44d8cf2b1e 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -59,7 +59,10 @@
//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
#define MAX_EPOLL_EVENTS 100
-#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 1
+// TODO(juanlishen): We use a greater-than-one value here as a workaround fix to
+// a keepalive ping timeout issue. We may want to revert https://github
+// .com/grpc/grpc/pull/14943 once we figure out the root cause.
+#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
"pollable_refcount");
@@ -198,6 +201,7 @@ struct grpc_pollset_worker {
struct grpc_pollset {
gpr_mu mu;
+ gpr_atm worker_count;
pollable* active_pollable;
bool kicked_without_poller;
grpc_closure* shutdown_closure;
@@ -685,6 +689,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
gpr_mu_init(&pollset->mu);
+ gpr_atm_no_barrier_store(&pollset->worker_count, 0);
pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset");
pollset->kicked_without_poller = false;
pollset->shutdown_closure = nullptr;
@@ -758,8 +763,20 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
pollable* pollable_obj, bool drain) {
GPR_TIMER_SCOPE("pollable_process_events", 0);
static const char* err_desc = "pollset_process_events";
+ // Use a simple heuristic to determine how many fd events to process
+ // per loop iteration. (events/workers)
+ int handle_count = 1;
+ int worker_count = gpr_atm_no_barrier_load(&pollset->worker_count);
+ GPR_ASSERT(worker_count > 0);
+ handle_count =
+ (pollable_obj->event_count - pollable_obj->event_cursor) / worker_count;
+ if (handle_count == 0) {
+ handle_count = 1;
+ } else if (handle_count > MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) {
+ handle_count = MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL;
+ }
grpc_error* error = GRPC_ERROR_NONE;
- for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
+ for (int i = 0; (drain || i < handle_count) &&
pollable_obj->event_cursor != pollable_obj->event_count;
i++) {
int n = pollable_obj->event_cursor++;
@@ -884,6 +901,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
GPR_TIMER_SCOPE("begin_worker", 0);
bool do_poll =
(pollset->shutdown_closure == nullptr && !pollset->already_shutdown);
+ gpr_atm_no_barrier_fetch_add(&pollset->worker_count, 1);
if (worker_hdl != nullptr) *worker_hdl = worker;
worker->initialized_cv = false;
worker->kicked = false;
@@ -964,6 +982,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
+ gpr_atm_no_barrier_fetch_add(&pollset->worker_count, -1);
}
#ifndef NDEBUG
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 2e375b4022..d9aba9b6a3 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -1530,6 +1530,12 @@ static void run_poll(void* args) {
// This function overrides poll() to handle condition variable wakeup fds
static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
+ if (timeout == 0) {
+ // Don't bother using background threads for polling if timeout is 0,
+ // poll-cv might not wait for a poll to return otherwise.
+ // https://github.com/grpc/grpc/issues/13298
+ return poll(fds, nfds, 0);
+ }
unsigned int i;
int res, idx;
grpc_cv_node* pollcv;
diff --git a/src/core/lib/iomgr/socket_utils_linux.cc b/src/core/lib/iomgr/socket_utils_linux.cc
index b0207578de..34f93cc4b0 100644
--- a/src/core/lib/iomgr/socket_utils_linux.cc
+++ b/src/core/lib/iomgr/socket_utils_linux.cc
@@ -33,7 +33,6 @@
int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock,
int cloexec) {
int flags = 0;
- GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t));
flags |= nonblock ? SOCK_NONBLOCK : 0;
flags |= cloexec ? SOCK_CLOEXEC : 0;
return accept4(sockfd, reinterpret_cast<grpc_sockaddr*>(resolved_addr->addr),
diff --git a/src/core/lib/iomgr/socket_utils_posix.cc b/src/core/lib/iomgr/socket_utils_posix.cc
index 2a49583ac4..c48da52ffb 100644
--- a/src/core/lib/iomgr/socket_utils_posix.cc
+++ b/src/core/lib/iomgr/socket_utils_posix.cc
@@ -34,9 +34,8 @@
int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock,
int cloexec) {
int fd, flags;
- GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t));
- fd = accept(sockfd, (grpc_sockaddr*)resolved_addr->addr,
- (socklen_t*)&resolved_addr->len);
+ fd = accept(sockfd, reinterpret_cast<grpc_sockaddr*>(resolved_addr->addr),
+ &resolved_addr->len);
if (fd >= 0) {
if (nonblock) {
flags = fcntl(fd, F_GETFL, 0);
diff --git a/src/core/lib/iomgr/socket_utils_uv.cc b/src/core/lib/iomgr/socket_utils_uv.cc
index 8538abc7e4..7eba40c46b 100644
--- a/src/core/lib/iomgr/socket_utils_uv.cc
+++ b/src/core/lib/iomgr/socket_utils_uv.cc
@@ -38,8 +38,8 @@ int grpc_inet_pton(int af, const char* src, void* dst) {
}
const char* grpc_inet_ntop(int af, const void* src, char* dst, size_t size) {
- /* Windows InetNtopA wants a mutable ip pointer */
- return inet_ntop(af, src, dst, (socklen_t)size);
+ uv_inet_ntop(af, src, dst, size);
+ return dst;
}
#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_custom.h b/src/core/lib/iomgr/tcp_custom.h
index 22caa149f8..784ef84222 100644
--- a/src/core/lib/iomgr/tcp_custom.h
+++ b/src/core/lib/iomgr/tcp_custom.h
@@ -62,8 +62,6 @@ typedef struct grpc_socket_vtable {
const grpc_sockaddr* addr, int* len);
grpc_error* (*getsockname)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
- grpc_error* (*setsockopt)(grpc_custom_socket* socket, int level, int optname,
- const void* optval, uint32_t optlen);
grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, int flags);
grpc_error* (*listen)(grpc_custom_socket* socket);
diff --git a/src/core/lib/iomgr/tcp_server_custom.cc b/src/core/lib/iomgr/tcp_server_custom.cc
index be92e61b62..79ba5c39ee 100644
--- a/src/core/lib/iomgr/tcp_server_custom.cc
+++ b/src/core/lib/iomgr/tcp_server_custom.cc
@@ -393,13 +393,6 @@ static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
grpc_custom_socket_vtable->init(socket, family);
if (error == GRPC_ERROR_NONE) {
-#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
- if (family == AF_INET || family == AF_INET6) {
- int enable = 1;
- grpc_custom_socket_vtable->setsockopt(socket, SOL_SOCKET, SO_REUSEPORT,
- &enable, sizeof(enable));
- }
-#endif /* GPR_LINUX && SO_REUSEPORT */
error = add_socket_to_server(s, socket, addr, port_index, &sp);
}
gpr_free(allocated_addr);
diff --git a/src/core/lib/iomgr/tcp_server_windows.cc b/src/core/lib/iomgr/tcp_server_windows.cc
index 77f3811dca..b01afdcc9d 100644
--- a/src/core/lib/iomgr/tcp_server_windows.cc
+++ b/src/core/lib/iomgr/tcp_server_windows.cc
@@ -129,6 +129,7 @@ static void destroy_server(void* arg, grpc_error* error) {
gpr_free(sp);
}
grpc_channel_args_destroy(s->channel_args);
+ gpr_mu_destroy(&s->mu);
gpr_free(s);
}
diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc
index 5e3166926b..f20f8dcb74 100644
--- a/src/core/lib/iomgr/tcp_uv.cc
+++ b/src/core/lib/iomgr/tcp_uv.cc
@@ -192,6 +192,15 @@ static grpc_error* uv_socket_init_helper(uv_socket_t* uv_socket, int domain) {
if (status != 0) {
return tcp_error_create("Failed to initialize UV tcp handle", status);
}
+#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
+ if (domain == AF_INET || domain == AF_INET6) {
+ int enable = 1;
+ int fd;
+ uv_fileno((uv_handle_t*)tcp, &fd);
+ // TODO Handle error here.
+ setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
+ }
+#endif
uv_socket->write_buffers = nullptr;
uv_socket->read_len = 0;
uv_tcp_nodelay(uv_socket->handle, 1);
@@ -299,17 +308,6 @@ static grpc_error* uv_socket_listen(grpc_custom_socket* socket) {
return tcp_error_create("Failed to listen to port", status);
}
-static grpc_error* uv_socket_setsockopt(grpc_custom_socket* socket, int level,
- int option_name, const void* optval,
- socklen_t option_len) {
- int fd;
- uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
- uv_fileno((uv_handle_t*)uv_socket->handle, &fd);
- // TODO Handle error here. Also, does this work on windows??
- setsockopt(fd, level, option_name, &optval, (socklen_t)option_len);
- return GRPC_ERROR_NONE;
-}
-
static void uv_tc_on_connect(uv_connect_t* req, int status) {
grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
@@ -340,7 +338,6 @@ static void uv_socket_connect(grpc_custom_socket* socket,
static grpc_resolved_addresses* handle_addrinfo_result(
struct addrinfo* result) {
struct addrinfo* resp;
- struct addrinfo* prev;
size_t i;
grpc_resolved_addresses* addresses =
(grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses));
@@ -350,16 +347,13 @@ static grpc_resolved_addresses* handle_addrinfo_result(
}
addresses->addrs = (grpc_resolved_address*)gpr_malloc(
sizeof(grpc_resolved_address) * addresses->naddrs);
- i = 0;
- resp = result;
- while (resp != nullptr) {
+ for (resp = result, i = 0; resp != nullptr; resp = resp->ai_next, i++) {
memcpy(&addresses->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
addresses->addrs[i].len = resp->ai_addrlen;
- i++;
- prev = resp;
- resp = resp->ai_next;
- gpr_free(prev);
}
+ // addrinfo objects are allocated by libuv (e.g. in uv_getaddrinfo)
+ // and not by gpr_malloc
+ uv_freeaddrinfo(result);
return addresses;
}
@@ -415,10 +409,9 @@ static void uv_resolve_async(grpc_custom_resolver* r, char* host, char* port) {
grpc_custom_resolver_vtable uv_resolver_vtable = {uv_resolve, uv_resolve_async};
grpc_socket_vtable grpc_uv_socket_vtable = {
- uv_socket_init, uv_socket_connect, uv_socket_destroy,
- uv_socket_shutdown, uv_socket_close, uv_socket_write,
- uv_socket_read, uv_socket_getpeername, uv_socket_getsockname,
- uv_socket_setsockopt, uv_socket_bind, uv_socket_listen,
- uv_socket_accept};
+ uv_socket_init, uv_socket_connect, uv_socket_destroy,
+ uv_socket_shutdown, uv_socket_close, uv_socket_write,
+ uv_socket_read, uv_socket_getpeername, uv_socket_getsockname,
+ uv_socket_bind, uv_socket_listen, uv_socket_accept};
#endif
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc
index 9a4da59624..e8f14dd2e5 100644
--- a/src/core/lib/iomgr/timer_generic.cc
+++ b/src/core/lib/iomgr/timer_generic.cc
@@ -98,6 +98,12 @@ static void init_timer_ht() {
}
}
+static void destroy_timer_ht() {
+ for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
+ gpr_mu_destroy(&g_hash_mu[i]);
+ }
+}
+
static bool is_in_ht(grpc_timer* t) {
size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
@@ -189,6 +195,7 @@ static void validate_non_pending_timer(grpc_timer* t) {
}
#define INIT_TIMER_HASH_TABLE() init_timer_ht()
+#define DESTROY_TIMER_HASH_TABLE() destroy_timer_ht()
#define ADD_TO_HASH_TABLE(t) add_to_ht((t))
#define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t))
#define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t))
@@ -196,6 +203,7 @@ static void validate_non_pending_timer(grpc_timer* t) {
#else
#define INIT_TIMER_HASH_TABLE()
+#define DESTROY_TIMER_HASH_TABLE()
#define ADD_TO_HASH_TABLE(t)
#define REMOVE_FROM_HASH_TABLE(t)
#define VALIDATE_NON_PENDING_TIMER(t)
@@ -299,6 +307,8 @@ static void timer_list_shutdown() {
gpr_free(g_shards);
gpr_free(g_shard_queue);
g_shared_mutables.initialized = false;
+
+ DESTROY_TIMER_HASH_TABLE();
}
/* returns true if the first element in the list */