aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-09-29 08:22:02 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-09-29 08:22:02 -0700
commit6b5bf16240039c304b494bb341eb3820271e27d6 (patch)
tree9ad0a1fd0810d53724e38f2bba3e672fb89ebb35 /src/core/iomgr
parent609779a252dd84c3cd800bfe644547641a5a90ab (diff)
parent2d8c6b37d1d86c4e98ed7cb7b7db213cc766a22f (diff)
Merge github.com:grpc/grpc into can-we-be-single-threaded
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c4
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c3
-rw-r--r--src/core/iomgr/pollset_posix.c5
-rw-r--r--src/core/iomgr/resolve_address_posix.c6
-rw-r--r--src/core/iomgr/resolve_address_windows.c3
-rw-r--r--src/core/iomgr/tcp_posix.c9
-rw-r--r--src/core/iomgr/udp_server.c8
-rw-r--r--src/core/iomgr/udp_server.h8
8 files changed, 41 insertions, 5 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 5626b08a47..a4293eb4a4 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -42,6 +42,7 @@
#include <unistd.h>
#include "src/core/iomgr/fd_posix.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -179,7 +180,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
pfds[1].events = POLLIN;
pfds[1].revents = 0;
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
if (poll_rv < 0) {
if (errno != EINTR) {
@@ -193,6 +196,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
}
if (pfds[1].revents) {
do {
+ /* The following epoll_wait never blocks; it has a timeout of 0 */
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
if (ep_rv < 0) {
if (errno != EINTR) {
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 2a18cedb33..44031b8ef6 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -44,6 +44,7 @@
#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -147,7 +148,9 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
POLLOUT, &watchers[i]);
}
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
r = grpc_poll_function(pfds, pfd_count, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 43ad22c16d..e80963e0ea 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -47,6 +47,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/profiling/timers.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
@@ -118,11 +119,13 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
+ gpr_tls_init(&g_current_thread_worker);
grpc_wakeup_fd_global_init();
}
void grpc_pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_poller);
+ gpr_tls_destroy(&g_current_thread_worker);
grpc_wakeup_fd_global_destroy();
}
@@ -468,7 +471,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
r = grpc_poll_function(pfd, nfds, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index 99bd566e10..ed0a93fcc9 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -43,6 +43,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/support/block_annotate.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
@@ -103,13 +104,18 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, port, &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
if (s != 0) {
/* Retry if well-known service name is recognized */
char *svc[][2] = {{"http", "80"}, {"https", "443"}};
for (i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
if (strcmp(port, svc[i][0]) == 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, svc[i][1], &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
break;
}
}
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index fcd80b3912..82a5602996 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -42,6 +42,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/support/block_annotate.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
@@ -88,7 +89,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, port, &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
if (s != 0) {
gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s));
goto done;
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index c98f0125f8..54ebad7dbc 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -78,6 +78,9 @@ typedef struct {
size_t slice_size;
gpr_refcount refcount;
+ /* garbage after the last read */
+ gpr_slice_buffer last_read_buffer;
+
gpr_slice_buffer *incoming_buffer;
gpr_slice_buffer *outgoing_buffer;
/** slice within outgoing_buffer to write next */
@@ -106,6 +109,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan");
+ gpr_slice_buffer_destroy(&tcp->last_read_buffer);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@@ -226,7 +230,8 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
gpr_slice_buffer_trim_end(
tcp->incoming_buffer,
- tcp->incoming_buffer->length - (size_t)read_bytes);
+ tcp->incoming_buffer->length - (size_t)read_bytes,
+ &tcp->last_read_buffer);
} else if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
@@ -259,6 +264,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->read_cb = cb;
tcp->incoming_buffer = incoming_buffer;
gpr_slice_buffer_reset_and_unref(incoming_buffer);
+ gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = 0;
@@ -457,6 +463,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
+ gpr_slice_buffer_init(&tcp->last_read_buffer);
return &tcp->base;
}
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index ae7c889d0c..9baaf1edc7 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -117,6 +117,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+ /* The parent grpc server */
+ grpc_server* grpc_server;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -276,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
/* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb);
- sp->read_cb(sp->fd);
+ sp->read_cb(sp->fd, sp->server->grpc_server);
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
@@ -402,11 +404,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
}
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
- grpc_pollset **pollsets, size_t pollset_count) {
+ grpc_pollset **pollsets, size_t pollset_count,
+ grpc_server *server) {
size_t i, j;
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
+ s->grpc_server = server;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index 76082d7761..8e3abae864 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -36,18 +36,22 @@
#include "src/core/iomgr/endpoint.h"
+/* Forward decl of grpc_server */
+typedef struct grpc_server grpc_server;
+
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
/* Called when data is available to read from the socket. */
-typedef void (*grpc_udp_server_read_cb)(int fd);
+typedef void (*grpc_udp_server_read_cb)(int fd, grpc_server *server);
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
/* Start listening to bound ports */
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server,
- grpc_pollset **pollsets, size_t pollset_count);
+ grpc_pollset **pollsets, size_t pollset_count,
+ grpc_server *server);
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);