aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/client_config/subchannel.h8
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c4
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c3
-rw-r--r--src/core/iomgr/pollset_posix.c3
-rw-r--r--src/core/iomgr/resolve_address_posix.c6
-rw-r--r--src/core/iomgr/resolve_address_windows.c3
-rw-r--r--src/core/iomgr/udp_server.c8
-rw-r--r--src/core/iomgr/udp_server.h8
-rw-r--r--src/core/support/block_annotate.h44
-rw-r--r--src/core/support/file.c6
-rw-r--r--src/core/support/time_posix.c7
-rw-r--r--src/core/support/time_precise.h2
-rw-r--r--src/core/support/time_win32.c4
-rw-r--r--src/cpp/proto/proto_utils.cc17
-rw-r--r--src/cpp/server/secure_server_credentials.h2
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb16
16 files changed, 109 insertions, 32 deletions
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index c9e5861d9c..2af3ce9e56 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -64,13 +64,13 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
-void grpc_subchannel_ref(
- grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_ref(grpc_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_subchannel_call_ref(
- grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_call_ref(grpc_subchannel_call *call
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 5626b08a47..a4293eb4a4 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -42,6 +42,7 @@
#include <unistd.h>
#include "src/core/iomgr/fd_posix.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -179,7 +180,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
pfds[1].events = POLLIN;
pfds[1].revents = 0;
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
if (poll_rv < 0) {
if (errno != EINTR) {
@@ -193,6 +196,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
}
if (pfds[1].revents) {
do {
+ /* The following epoll_wait never blocks; it has a timeout of 0 */
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
if (ep_rv < 0) {
if (errno != EINTR) {
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 2a18cedb33..44031b8ef6 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -44,6 +44,7 @@
#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -147,7 +148,9 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
POLLOUT, &watchers[i]);
}
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
r = grpc_poll_function(pfds, pfd_count, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 43ad22c16d..10bab134d7 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -47,6 +47,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/profiling/timers.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
@@ -468,7 +469,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
r = grpc_poll_function(pfd, nfds, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index 99bd566e10..ed0a93fcc9 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -43,6 +43,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/support/block_annotate.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
@@ -103,13 +104,18 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, port, &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
if (s != 0) {
/* Retry if well-known service name is recognized */
char *svc[][2] = {{"http", "80"}, {"https", "443"}};
for (i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
if (strcmp(port, svc[i][0]) == 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, svc[i][1], &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
break;
}
}
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index fcd80b3912..82a5602996 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -42,6 +42,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/support/block_annotate.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
@@ -88,7 +89,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, port, &hints, &result);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
if (s != 0) {
gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s));
goto done;
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index ae7c889d0c..9baaf1edc7 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -117,6 +117,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+ /* The parent grpc server */
+ grpc_server* grpc_server;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -276,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
/* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb);
- sp->read_cb(sp->fd);
+ sp->read_cb(sp->fd, sp->server->grpc_server);
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
@@ -402,11 +404,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
}
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
- grpc_pollset **pollsets, size_t pollset_count) {
+ grpc_pollset **pollsets, size_t pollset_count,
+ grpc_server *server) {
size_t i, j;
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
+ s->grpc_server = server;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index 76082d7761..b6cf3c8ed9 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -36,18 +36,22 @@
#include "src/core/iomgr/endpoint.h"
+/* Forward decl of grpc_server */
+typedef struct grpc_server grpc_server;
+
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
/* Called when data is available to read from the socket. */
-typedef void (*grpc_udp_server_read_cb)(int fd);
+typedef void (*grpc_udp_server_read_cb)(int fd, grpc_server* server);
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
/* Start listening to bound ports */
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server,
- grpc_pollset **pollsets, size_t pollset_count);
+ grpc_pollset **pollsets, size_t pollset_count,
+ grpc_server *server);
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
diff --git a/src/core/support/block_annotate.h b/src/core/support/block_annotate.h
new file mode 100644
index 0000000000..bf2c17f859
--- /dev/null
+++ b/src/core/support/block_annotate.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H
+#define GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H
+
+/* These annotations identify the beginning and end of regions where
+ the code may block for reasons other than synchronization functions.
+ These include poll, epoll, and getaddrinfo. */
+
+#define GRPC_SCHEDULING_START_BLOCKING_REGION do {} while (0)
+#define GRPC_SCHEDULING_END_BLOCKING_REGION do {} while (0)
+
+#endif /* GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H */
diff --git a/src/core/support/file.c b/src/core/support/file.c
index c1361d8a9e..8c673dbcc6 100644
--- a/src/core/support/file.c
+++ b/src/core/support/file.c
@@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include "src/core/support/block_annotate.h"
#include "src/core/support/string.h"
gpr_slice gpr_load_file(const char *filename, int add_null_terminator,
@@ -48,9 +49,11 @@ gpr_slice gpr_load_file(const char *filename, int add_null_terminator,
size_t contents_size = 0;
char *error_msg = NULL;
gpr_slice result = gpr_empty_slice();
- FILE *file = fopen(filename, "rb");
+ FILE *file;
size_t bytes_read = 0;
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ file = fopen(filename, "rb");
if (file == NULL) {
gpr_asprintf(&error_msg, "Could not open file %s (error = %s).", filename,
strerror(errno));
@@ -83,5 +86,6 @@ end:
if (success != NULL) *success = 0;
}
if (file != NULL) fclose(file);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
return result;
}
diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c
index dcecff0d05..eedfd0a060 100644
--- a/src/core/support/time_posix.c
+++ b/src/core/support/time_posix.c
@@ -41,6 +41,7 @@
#include <unistd.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include "src/core/support/block_annotate.h"
static struct timespec timespec_from_gpr(gpr_timespec gts) {
struct timespec rv;
@@ -126,6 +127,7 @@ void gpr_sleep_until(gpr_timespec until) {
gpr_timespec now;
gpr_timespec delta;
struct timespec delta_ts;
+ int ns_result;
for (;;) {
/* We could simplify by using clock_nanosleep instead, but it might be
@@ -137,7 +139,10 @@ void gpr_sleep_until(gpr_timespec until) {
delta = gpr_time_sub(until, now);
delta_ts = timespec_from_gpr(delta);
- if (nanosleep(&delta_ts, NULL) == 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ ns_result = nanosleep(&delta_ts, NULL);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ if (ns_result == 0) {
break;
}
}
diff --git a/src/core/support/time_precise.h b/src/core/support/time_precise.h
index a72d37e2f1..cd201faab9 100644
--- a/src/core/support/time_precise.h
+++ b/src/core/support/time_precise.h
@@ -84,7 +84,7 @@ static void gpr_precise_clock_now(gpr_timespec *clk) {
}
#else /* GRPC_TIMERS_RDTSC */
-static void gpr_precise_clock_now(gpr_timespec* clk) {
+static void gpr_precise_clock_now(gpr_timespec *clk) {
*clk = gpr_now(GPR_CLOCK_REALTIME);
clk->clock_type = GPR_CLOCK_PRECISE;
}
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index f794855429..bc0586d069 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -41,6 +41,8 @@
#include <src/core/support/time_precise.h>
#include <sys/timeb.h>
+#include "src/core/support/block_annotate.h"
+
static LARGE_INTEGER g_start_time;
static double g_time_scale;
@@ -92,7 +94,9 @@ void gpr_sleep_until(gpr_timespec until) {
delta = gpr_time_sub(until, now);
sleep_millis =
(DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
Sleep(sleep_millis);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
}
}
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index f47acc8f8d..3c0be58919 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -158,10 +158,19 @@ namespace grpc {
Status SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp) {
- GrpcBufferWriter writer(bp);
- return msg.SerializeToZeroCopyStream(&writer)
- ? Status::OK
- : Status(StatusCode::INTERNAL, "Failed to serialize message");
+ int byte_size = msg.ByteSize();
+ if (byte_size <= kMaxBufferLength) {
+ gpr_slice slice = gpr_slice_malloc(byte_size);
+ GPR_ASSERT(GPR_SLICE_END_PTR(slice) == msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
+ *bp = grpc_raw_byte_buffer_create(&slice, 1);
+ gpr_slice_unref(slice);
+ return Status::OK;
+ } else {
+ GrpcBufferWriter writer(bp);
+ return msg.SerializeToZeroCopyStream(&writer)
+ ? Status::OK
+ : Status(StatusCode::INTERNAL, "Failed to serialize message");
+ }
}
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h
index 4f003c6b7e..5460f4a02c 100644
--- a/src/cpp/server/secure_server_credentials.h
+++ b/src/cpp/server/secure_server_credentials.h
@@ -46,7 +46,7 @@ namespace grpc {
class AuthMetadataProcessorAyncWrapper GRPC_FINAL {
public:
- static void Destroy(void *wrapper);
+ static void Destroy(void* wrapper);
static void Process(void* wrapper, grpc_auth_context* context,
const grpc_metadata* md, size_t num_md,
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index e484a9ea50..efe07f734e 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -405,22 +405,6 @@ describe GRPC::RpcServer do
t.join
end
- it 'should not receive metadata if the client times out', server: true do
- service = SlowService.new
- @srv.handle(service)
- t = Thread.new { @srv.run }
- @srv.wait_till_running
- req = EchoMsg.new
- stub = SlowStub.new(@host, **client_opts)
- timeout = 0.1 # too short for SlowService to respond
- blk = proc { stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2') }
- expect(&blk).to raise_error GRPC::BadStatus
- wanted_md = []
- expect(service.received_md).to eq(wanted_md)
- @srv.stop
- t.join
- end
-
it 'should handle cancellation correctly', server: true do
service = SlowService.new
@srv.handle(service)