diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/client_config/subchannel.h | 8 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_windows.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 9 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.h | 8 | ||||
-rw-r--r-- | src/core/support/block_annotate.h | 48 | ||||
-rw-r--r-- | src/core/support/file.c | 6 | ||||
-rw-r--r-- | src/core/support/slice_buffer.c | 9 | ||||
-rw-r--r-- | src/core/support/time_posix.c | 7 | ||||
-rw-r--r-- | src/core/support/time_precise.h | 2 | ||||
-rw-r--r-- | src/core/support/time_win32.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2/incoming_metadata.c | 2 |
16 files changed, 113 insertions, 17 deletions
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index c9e5861d9c..2af3ce9e56 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -64,13 +64,13 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif -void grpc_subchannel_ref( - grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_ref(grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_ref( - grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_ref(grpc_subchannel_call *call + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 5626b08a47..a4293eb4a4 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -42,6 +42,7 @@ #include <unistd.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -179,7 +180,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( pfds[1].events = POLLIN; pfds[1].revents = 0; + GRPC_SCHEDULING_START_BLOCKING_REGION; poll_rv = grpc_poll_function(pfds, 2, timeout_ms); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (poll_rv < 0) { if (errno != EINTR) { @@ -193,6 +196,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( } if (pfds[1].revents) { do { + /* The following epoll_wait never blocks; it has a timeout of 0 */ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); if (ep_rv < 0) { if (errno != EINTR) { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 2a18cedb33..44031b8ef6 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -44,6 +44,7 @@ #include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/iomgr_internal.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -147,7 +148,9 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( POLLOUT, &watchers[i]); } + GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfds, pfd_count, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; for (i = 1; i < pfd_count; i++) { grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 43ad22c16d..10bab134d7 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -47,6 +47,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> @@ -468,7 +469,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ + GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfd, nfds, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index 99bd566e10..ed0a93fcc9 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -43,6 +43,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -103,13 +104,18 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; + if (s != 0) { /* Retry if well-known service name is recognized */ char *svc[][2] = {{"http", "80"}, {"https", "443"}}; for (i = 0; i < GPR_ARRAY_SIZE(svc); i++) { if (strcmp(port, svc[i][0]) == 0) { + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, svc[i][1], &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; break; } } diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index fcd80b3912..82a5602996 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -42,6 +42,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -88,7 +89,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (s != 0) { gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s)); goto done; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index c98f0125f8..54ebad7dbc 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -78,6 +78,9 @@ typedef struct { size_t slice_size; gpr_refcount refcount; + /* garbage after the last read */ + gpr_slice_buffer last_read_buffer; + gpr_slice_buffer *incoming_buffer; gpr_slice_buffer *outgoing_buffer; /** slice within outgoing_buffer to write next */ @@ -106,6 +109,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_slice_buffer_destroy(&tcp->last_read_buffer); gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -226,7 +230,8 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if ((size_t)read_bytes < tcp->incoming_buffer->length) { gpr_slice_buffer_trim_end( tcp->incoming_buffer, - tcp->incoming_buffer->length - (size_t)read_bytes); + tcp->incoming_buffer->length - (size_t)read_bytes, + &tcp->last_read_buffer); } else if (tcp->iov_size < MAX_READ_IOVEC) { ++tcp->iov_size; } @@ -259,6 +264,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->read_cb = cb; tcp->incoming_buffer = incoming_buffer; gpr_slice_buffer_reset_and_unref(incoming_buffer); + gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = 0; @@ -457,6 +463,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; + gpr_slice_buffer_init(&tcp->last_read_buffer); return &tcp->base; } diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index ae7c889d0c..9baaf1edc7 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -117,6 +117,8 @@ struct grpc_udp_server { grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; + /* The parent grpc server */ + grpc_server* grpc_server; }; grpc_udp_server *grpc_udp_server_create(void) { @@ -276,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(sp->fd); + sp->read_cb(sp->fd, sp->server->grpc_server); /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); @@ -402,11 +404,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { } void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, - grpc_pollset **pollsets, size_t pollset_count) { + grpc_pollset **pollsets, size_t pollset_count, + grpc_server *server) { size_t i, j; gpr_mu_lock(&s->mu); GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; + s->grpc_server = server; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 76082d7761..8e3abae864 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -36,18 +36,22 @@ #include "src/core/iomgr/endpoint.h" +/* Forward decl of grpc_server */ +typedef struct grpc_server grpc_server; + /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; /* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(int fd); +typedef void (*grpc_udp_server_read_cb)(int fd, grpc_server *server); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, - grpc_pollset **pollsets, size_t pollset_count); + grpc_pollset **pollsets, size_t pollset_count, + grpc_server *server); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); diff --git a/src/core/support/block_annotate.h b/src/core/support/block_annotate.h new file mode 100644 index 0000000000..3cd8eee272 --- /dev/null +++ b/src/core/support/block_annotate.h @@ -0,0 +1,48 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H +#define GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H + +/* These annotations identify the beginning and end of regions where + the code may block for reasons other than synchronization functions. + These include poll, epoll, and getaddrinfo. */ + +#define GRPC_SCHEDULING_START_BLOCKING_REGION \ + do { \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION \ + do { \ + } while (0) + +#endif /* GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H */ diff --git a/src/core/support/file.c b/src/core/support/file.c index c1361d8a9e..8c673dbcc6 100644 --- a/src/core/support/file.c +++ b/src/core/support/file.c @@ -40,6 +40,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/support/block_annotate.h" #include "src/core/support/string.h" gpr_slice gpr_load_file(const char *filename, int add_null_terminator, @@ -48,9 +49,11 @@ gpr_slice gpr_load_file(const char *filename, int add_null_terminator, size_t contents_size = 0; char *error_msg = NULL; gpr_slice result = gpr_empty_slice(); - FILE *file = fopen(filename, "rb"); + FILE *file; size_t bytes_read = 0; + GRPC_SCHEDULING_START_BLOCKING_REGION; + file = fopen(filename, "rb"); if (file == NULL) { gpr_asprintf(&error_msg, "Could not open file %s (error = %s).", filename, strerror(errno)); @@ -83,5 +86,6 @@ end: if (success != NULL) *success = 0; } if (file != NULL) fclose(file); + GRPC_SCHEDULING_END_BLOCKING_REGION; return result; } diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index 8873d459d5..a1aa56fd72 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -208,7 +208,7 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) { src->length = 0; } -void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) { +void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n, gpr_slice_buffer *garbage) { GPR_ASSERT(n <= sb->length); sb->length -= n; for (;;) { @@ -216,14 +216,15 @@ void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) { gpr_slice slice = sb->slices[idx]; size_t slice_len = GPR_SLICE_LENGTH(slice); if (slice_len > n) { - sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n); + sb->slices[idx] = gpr_slice_split_head(&slice, slice_len - n); + gpr_slice_buffer_add_indexed(garbage, slice); return; } else if (slice_len == n) { - gpr_slice_unref(slice); + gpr_slice_buffer_add_indexed(garbage, slice); sb->count = idx; return; } else { - gpr_slice_unref(slice); + gpr_slice_buffer_add_indexed(garbage, slice); n -= slice_len; sb->count = idx; } diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index dcecff0d05..eedfd0a060 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -41,6 +41,7 @@ #include <unistd.h> #include <grpc/support/log.h> #include <grpc/support/time.h> +#include "src/core/support/block_annotate.h" static struct timespec timespec_from_gpr(gpr_timespec gts) { struct timespec rv; @@ -126,6 +127,7 @@ void gpr_sleep_until(gpr_timespec until) { gpr_timespec now; gpr_timespec delta; struct timespec delta_ts; + int ns_result; for (;;) { /* We could simplify by using clock_nanosleep instead, but it might be @@ -137,7 +139,10 @@ void gpr_sleep_until(gpr_timespec until) { delta = gpr_time_sub(until, now); delta_ts = timespec_from_gpr(delta); - if (nanosleep(&delta_ts, NULL) == 0) { + GRPC_SCHEDULING_START_BLOCKING_REGION; + ns_result = nanosleep(&delta_ts, NULL); + GRPC_SCHEDULING_END_BLOCKING_REGION; + if (ns_result == 0) { break; } } diff --git a/src/core/support/time_precise.h b/src/core/support/time_precise.h index a72d37e2f1..cd201faab9 100644 --- a/src/core/support/time_precise.h +++ b/src/core/support/time_precise.h @@ -84,7 +84,7 @@ static void gpr_precise_clock_now(gpr_timespec *clk) { } #else /* GRPC_TIMERS_RDTSC */ -static void gpr_precise_clock_now(gpr_timespec* clk) { +static void gpr_precise_clock_now(gpr_timespec *clk) { *clk = gpr_now(GPR_CLOCK_REALTIME); clk->clock_type = GPR_CLOCK_PRECISE; } diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index f794855429..bc0586d069 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -41,6 +41,8 @@ #include <src/core/support/time_precise.h> #include <sys/timeb.h> +#include "src/core/support/block_annotate.h" + static LARGE_INTEGER g_start_time; static double g_time_scale; @@ -92,7 +94,9 @@ void gpr_sleep_until(gpr_timespec until) { delta = gpr_time_sub(until, now); sleep_millis = (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; + GRPC_SCHEDULING_START_BLOCKING_REGION; Sleep(sleep_millis); + GRPC_SCHEDULING_END_BLOCKING_REGION; } } diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index d216c42113..10c64f3356 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -171,7 +171,7 @@ void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( size_t copy_bytes = sizeof(*buffer->elems) * new_count; GPR_ASSERT(mdidx < buffer->count); buffer->elems = gpr_malloc(copy_bytes); - memcpy(live_op_buffer->elems + mdidx, buffer->elems, copy_bytes); + memcpy(buffer->elems, live_op_buffer->elems + mdidx, copy_bytes); buffer->count = buffer->capacity = new_count; } else { buffer->elems = NULL; |