aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/client_config/subchannel.h8
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c4
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c3
-rw-r--r--src/core/iomgr/pollset_posix.c3
-rw-r--r--src/core/iomgr/resolve_address_posix.c6
-rw-r--r--src/core/iomgr/resolve_address_windows.c3
-rw-r--r--src/core/iomgr/tcp_posix.c9
-rw-r--r--src/core/iomgr/udp_server.c8
-rw-r--r--src/core/iomgr/udp_server.h8
-rw-r--r--src/core/support/block_annotate.h48
-rw-r--r--src/core/support/file.c6
-rw-r--r--src/core/support/slice_buffer.c9
-rw-r--r--src/core/support/time_posix.c7
-rw-r--r--src/core/support/time_precise.h2
-rw-r--r--src/core/support/time_win32.c4
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c2
16 files changed, 113 insertions, 17 deletions
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index c9e5861d9c..2af3ce9e56 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -64,13 +64,13 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
-void grpc_subchannel_ref(
- grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_ref(grpc_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_subchannel_call_ref(
- grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_call_ref(grpc_subchannel_call *call
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 5626b08a47..a4293eb4a4 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -42,6 +42,7 @@
#include <unistd.h>
#include "src/core/iomgr/fd_posix.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -179,7 +180,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
pfds[1].events = POLLIN;
pfds[1].revents = 0;
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
if (poll_rv < 0) {
if (errno != EINTR) {
@@ -193,6 +196,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
}
if (pfds[1].revents) {
do {
+ /* The following epoll_wait never blocks; it has a timeout of 0 */
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
if (ep_rv < 0) {
if (errno != EINTR) {
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 2a18cedb33..44031b8ef6 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -44,6 +44,7 @@
#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -147,7 +148,9 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
POLLOUT, &watchers[i]);
}
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
r = grpc_poll_function(pfds, pfd_count, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 43ad22c16d..10bab134d7 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -47,6 +47,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/profiling/timers.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
@@ -468,7 +469,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
r = grpc_poll_function(pfd, nfds, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index 99bd566e10..ed0a93fcc9 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -43,6 +43,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/support/block_annotate.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
@@ -103,13 +104,18 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, port, &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
if (s != 0) {
/* Retry if well-known service name is recognized */
char *svc[][2] = {{"http", "80"}, {"https", "443"}};
for (i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
if (strcmp(port, svc[i][0]) == 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, svc[i][1], &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
break;
}
}
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index fcd80b3912..82a5602996 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -42,6 +42,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/support/block_annotate.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
@@ -88,7 +89,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, port, &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
if (s != 0) {
gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s));
goto done;
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index c98f0125f8..54ebad7dbc 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -78,6 +78,9 @@ typedef struct {
size_t slice_size;
gpr_refcount refcount;
+ /* garbage after the last read */
+ gpr_slice_buffer last_read_buffer;
+
gpr_slice_buffer *incoming_buffer;
gpr_slice_buffer *outgoing_buffer;
/** slice within outgoing_buffer to write next */
@@ -106,6 +109,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan");
+ gpr_slice_buffer_destroy(&tcp->last_read_buffer);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@@ -226,7 +230,8 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
gpr_slice_buffer_trim_end(
tcp->incoming_buffer,
- tcp->incoming_buffer->length - (size_t)read_bytes);
+ tcp->incoming_buffer->length - (size_t)read_bytes,
+ &tcp->last_read_buffer);
} else if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
@@ -259,6 +264,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->read_cb = cb;
tcp->incoming_buffer = incoming_buffer;
gpr_slice_buffer_reset_and_unref(incoming_buffer);
+ gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = 0;
@@ -457,6 +463,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
+ gpr_slice_buffer_init(&tcp->last_read_buffer);
return &tcp->base;
}
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index ae7c889d0c..9baaf1edc7 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -117,6 +117,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+ /* The parent grpc server */
+ grpc_server* grpc_server;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -276,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
/* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb);
- sp->read_cb(sp->fd);
+ sp->read_cb(sp->fd, sp->server->grpc_server);
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
@@ -402,11 +404,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
}
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
- grpc_pollset **pollsets, size_t pollset_count) {
+ grpc_pollset **pollsets, size_t pollset_count,
+ grpc_server *server) {
size_t i, j;
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
+ s->grpc_server = server;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index 76082d7761..8e3abae864 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -36,18 +36,22 @@
#include "src/core/iomgr/endpoint.h"
+/* Forward decl of grpc_server */
+typedef struct grpc_server grpc_server;
+
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
/* Called when data is available to read from the socket. */
-typedef void (*grpc_udp_server_read_cb)(int fd);
+typedef void (*grpc_udp_server_read_cb)(int fd, grpc_server *server);
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
/* Start listening to bound ports */
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server,
- grpc_pollset **pollsets, size_t pollset_count);
+ grpc_pollset **pollsets, size_t pollset_count,
+ grpc_server *server);
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
diff --git a/src/core/support/block_annotate.h b/src/core/support/block_annotate.h
new file mode 100644
index 0000000000..3cd8eee272
--- /dev/null
+++ b/src/core/support/block_annotate.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H
+#define GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H
+
+/* These annotations identify the beginning and end of regions where
+ the code may block for reasons other than synchronization functions.
+ These include poll, epoll, and getaddrinfo. */
+
+#define GRPC_SCHEDULING_START_BLOCKING_REGION \
+ do { \
+ } while (0)
+#define GRPC_SCHEDULING_END_BLOCKING_REGION \
+ do { \
+ } while (0)
+
+#endif /* GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H */
diff --git a/src/core/support/file.c b/src/core/support/file.c
index c1361d8a9e..8c673dbcc6 100644
--- a/src/core/support/file.c
+++ b/src/core/support/file.c
@@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include "src/core/support/block_annotate.h"
#include "src/core/support/string.h"
gpr_slice gpr_load_file(const char *filename, int add_null_terminator,
@@ -48,9 +49,11 @@ gpr_slice gpr_load_file(const char *filename, int add_null_terminator,
size_t contents_size = 0;
char *error_msg = NULL;
gpr_slice result = gpr_empty_slice();
- FILE *file = fopen(filename, "rb");
+ FILE *file;
size_t bytes_read = 0;
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ file = fopen(filename, "rb");
if (file == NULL) {
gpr_asprintf(&error_msg, "Could not open file %s (error = %s).", filename,
strerror(errno));
@@ -83,5 +86,6 @@ end:
if (success != NULL) *success = 0;
}
if (file != NULL) fclose(file);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
return result;
}
diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c
index 8873d459d5..a1aa56fd72 100644
--- a/src/core/support/slice_buffer.c
+++ b/src/core/support/slice_buffer.c
@@ -208,7 +208,7 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) {
src->length = 0;
}
-void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) {
+void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n, gpr_slice_buffer *garbage) {
GPR_ASSERT(n <= sb->length);
sb->length -= n;
for (;;) {
@@ -216,14 +216,15 @@ void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) {
gpr_slice slice = sb->slices[idx];
size_t slice_len = GPR_SLICE_LENGTH(slice);
if (slice_len > n) {
- sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n);
+ sb->slices[idx] = gpr_slice_split_head(&slice, slice_len - n);
+ gpr_slice_buffer_add_indexed(garbage, slice);
return;
} else if (slice_len == n) {
- gpr_slice_unref(slice);
+ gpr_slice_buffer_add_indexed(garbage, slice);
sb->count = idx;
return;
} else {
- gpr_slice_unref(slice);
+ gpr_slice_buffer_add_indexed(garbage, slice);
n -= slice_len;
sb->count = idx;
}
diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c
index dcecff0d05..eedfd0a060 100644
--- a/src/core/support/time_posix.c
+++ b/src/core/support/time_posix.c
@@ -41,6 +41,7 @@
#include <unistd.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include "src/core/support/block_annotate.h"
static struct timespec timespec_from_gpr(gpr_timespec gts) {
struct timespec rv;
@@ -126,6 +127,7 @@ void gpr_sleep_until(gpr_timespec until) {
gpr_timespec now;
gpr_timespec delta;
struct timespec delta_ts;
+ int ns_result;
for (;;) {
/* We could simplify by using clock_nanosleep instead, but it might be
@@ -137,7 +139,10 @@ void gpr_sleep_until(gpr_timespec until) {
delta = gpr_time_sub(until, now);
delta_ts = timespec_from_gpr(delta);
- if (nanosleep(&delta_ts, NULL) == 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ ns_result = nanosleep(&delta_ts, NULL);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ if (ns_result == 0) {
break;
}
}
diff --git a/src/core/support/time_precise.h b/src/core/support/time_precise.h
index a72d37e2f1..cd201faab9 100644
--- a/src/core/support/time_precise.h
+++ b/src/core/support/time_precise.h
@@ -84,7 +84,7 @@ static void gpr_precise_clock_now(gpr_timespec *clk) {
}
#else /* GRPC_TIMERS_RDTSC */
-static void gpr_precise_clock_now(gpr_timespec* clk) {
+static void gpr_precise_clock_now(gpr_timespec *clk) {
*clk = gpr_now(GPR_CLOCK_REALTIME);
clk->clock_type = GPR_CLOCK_PRECISE;
}
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index f794855429..bc0586d069 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -41,6 +41,8 @@
#include <src/core/support/time_precise.h>
#include <sys/timeb.h>
+#include "src/core/support/block_annotate.h"
+
static LARGE_INTEGER g_start_time;
static double g_time_scale;
@@ -92,7 +94,9 @@ void gpr_sleep_until(gpr_timespec until) {
delta = gpr_time_sub(until, now);
sleep_millis =
(DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
Sleep(sleep_millis);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
}
}
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index d216c42113..10c64f3356 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -171,7 +171,7 @@ void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
size_t copy_bytes = sizeof(*buffer->elems) * new_count;
GPR_ASSERT(mdidx < buffer->count);
buffer->elems = gpr_malloc(copy_bytes);
- memcpy(live_op_buffer->elems + mdidx, buffer->elems, copy_bytes);
+ memcpy(buffer->elems, live_op_buffer->elems + mdidx, copy_bytes);
buffer->count = buffer->capacity = new_count;
} else {
buffer->elems = NULL;