diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/client_config/subchannel.h | 8 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_windows.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.h | 8 | ||||
-rw-r--r-- | src/core/support/block_annotate.h | 44 | ||||
-rw-r--r-- | src/core/support/file.c | 6 | ||||
-rw-r--r-- | src/core/support/time_posix.c | 7 | ||||
-rw-r--r-- | src/core/support/time_precise.h | 2 | ||||
-rw-r--r-- | src/core/support/time_win32.c | 4 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.cc | 17 | ||||
-rw-r--r-- | src/cpp/server/secure_server_credentials.h | 2 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 16 |
16 files changed, 109 insertions, 32 deletions
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index c9e5861d9c..2af3ce9e56 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -64,13 +64,13 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif -void grpc_subchannel_ref( - grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_ref(grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_ref( - grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_ref(grpc_subchannel_call *call + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 5626b08a47..a4293eb4a4 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -42,6 +42,7 @@ #include <unistd.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -179,7 +180,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( pfds[1].events = POLLIN; pfds[1].revents = 0; + GRPC_SCHEDULING_START_BLOCKING_REGION; poll_rv = grpc_poll_function(pfds, 2, timeout_ms); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (poll_rv < 0) { if (errno != EINTR) { @@ -193,6 +196,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( } if (pfds[1].revents) { do { + /* The following epoll_wait never blocks; it has a timeout of 0 */ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); if (ep_rv < 0) { if (errno != EINTR) { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 2a18cedb33..44031b8ef6 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -44,6 +44,7 @@ #include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/iomgr_internal.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -147,7 +148,9 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( POLLOUT, &watchers[i]); } + GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfds, pfd_count, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; for (i = 1; i < pfd_count; i++) { grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 43ad22c16d..10bab134d7 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -47,6 +47,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> @@ -468,7 +469,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ + GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfd, nfds, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index 99bd566e10..ed0a93fcc9 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -43,6 +43,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -103,13 +104,18 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; + if (s != 0) { /* Retry if well-known service name is recognized */ char *svc[][2] = {{"http", "80"}, {"https", "443"}}; for (i = 0; i < GPR_ARRAY_SIZE(svc); i++) { if (strcmp(port, svc[i][0]) == 0) { + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, svc[i][1], &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; break; } } diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index fcd80b3912..82a5602996 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -42,6 +42,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -88,7 +89,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (s != 0) { gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s)); goto done; diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index ae7c889d0c..9baaf1edc7 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -117,6 +117,8 @@ struct grpc_udp_server { grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; + /* The parent grpc server */ + grpc_server* grpc_server; }; grpc_udp_server *grpc_udp_server_create(void) { @@ -276,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(sp->fd); + sp->read_cb(sp->fd, sp->server->grpc_server); /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); @@ -402,11 +404,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { } void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, - grpc_pollset **pollsets, size_t pollset_count) { + grpc_pollset **pollsets, size_t pollset_count, + grpc_server *server) { size_t i, j; gpr_mu_lock(&s->mu); GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; + s->grpc_server = server; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 76082d7761..b6cf3c8ed9 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -36,18 +36,22 @@ #include "src/core/iomgr/endpoint.h" +/* Forward decl of grpc_server */ +typedef struct grpc_server grpc_server; + /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; /* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(int fd); +typedef void (*grpc_udp_server_read_cb)(int fd, grpc_server* server); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, - grpc_pollset **pollsets, size_t pollset_count); + grpc_pollset **pollsets, size_t pollset_count, + grpc_server *server); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); diff --git a/src/core/support/block_annotate.h b/src/core/support/block_annotate.h new file mode 100644 index 0000000000..bf2c17f859 --- /dev/null +++ b/src/core/support/block_annotate.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H +#define GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H + +/* These annotations identify the beginning and end of regions where + the code may block for reasons other than synchronization functions. + These include poll, epoll, and getaddrinfo. */ + +#define GRPC_SCHEDULING_START_BLOCKING_REGION do {} while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION do {} while (0) + +#endif /* GRPC_INTERNAL_CORE_SUPPORT_BLOCK_ANNOTATE_H */ diff --git a/src/core/support/file.c b/src/core/support/file.c index c1361d8a9e..8c673dbcc6 100644 --- a/src/core/support/file.c +++ b/src/core/support/file.c @@ -40,6 +40,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/support/block_annotate.h" #include "src/core/support/string.h" gpr_slice gpr_load_file(const char *filename, int add_null_terminator, @@ -48,9 +49,11 @@ gpr_slice gpr_load_file(const char *filename, int add_null_terminator, size_t contents_size = 0; char *error_msg = NULL; gpr_slice result = gpr_empty_slice(); - FILE *file = fopen(filename, "rb"); + FILE *file; size_t bytes_read = 0; + GRPC_SCHEDULING_START_BLOCKING_REGION; + file = fopen(filename, "rb"); if (file == NULL) { gpr_asprintf(&error_msg, "Could not open file %s (error = %s).", filename, strerror(errno)); @@ -83,5 +86,6 @@ end: if (success != NULL) *success = 0; } if (file != NULL) fclose(file); + GRPC_SCHEDULING_END_BLOCKING_REGION; return result; } diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index dcecff0d05..eedfd0a060 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -41,6 +41,7 @@ #include <unistd.h> #include <grpc/support/log.h> #include <grpc/support/time.h> +#include "src/core/support/block_annotate.h" static struct timespec timespec_from_gpr(gpr_timespec gts) { struct timespec rv; @@ -126,6 +127,7 @@ void gpr_sleep_until(gpr_timespec until) { gpr_timespec now; gpr_timespec delta; struct timespec delta_ts; + int ns_result; for (;;) { /* We could simplify by using clock_nanosleep instead, but it might be @@ -137,7 +139,10 @@ void gpr_sleep_until(gpr_timespec until) { delta = gpr_time_sub(until, now); delta_ts = timespec_from_gpr(delta); - if (nanosleep(&delta_ts, NULL) == 0) { + GRPC_SCHEDULING_START_BLOCKING_REGION; + ns_result = nanosleep(&delta_ts, NULL); + GRPC_SCHEDULING_END_BLOCKING_REGION; + if (ns_result == 0) { break; } } diff --git a/src/core/support/time_precise.h b/src/core/support/time_precise.h index a72d37e2f1..cd201faab9 100644 --- a/src/core/support/time_precise.h +++ b/src/core/support/time_precise.h @@ -84,7 +84,7 @@ static void gpr_precise_clock_now(gpr_timespec *clk) { } #else /* GRPC_TIMERS_RDTSC */ -static void gpr_precise_clock_now(gpr_timespec* clk) { +static void gpr_precise_clock_now(gpr_timespec *clk) { *clk = gpr_now(GPR_CLOCK_REALTIME); clk->clock_type = GPR_CLOCK_PRECISE; } diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index f794855429..bc0586d069 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -41,6 +41,8 @@ #include <src/core/support/time_precise.h> #include <sys/timeb.h> +#include "src/core/support/block_annotate.h" + static LARGE_INTEGER g_start_time; static double g_time_scale; @@ -92,7 +94,9 @@ void gpr_sleep_until(gpr_timespec until) { delta = gpr_time_sub(until, now); sleep_millis = (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; + GRPC_SCHEDULING_START_BLOCKING_REGION; Sleep(sleep_millis); + GRPC_SCHEDULING_END_BLOCKING_REGION; } } diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index f47acc8f8d..3c0be58919 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -158,10 +158,19 @@ namespace grpc { Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) { - GrpcBufferWriter writer(bp); - return msg.SerializeToZeroCopyStream(&writer) - ? Status::OK - : Status(StatusCode::INTERNAL, "Failed to serialize message"); + int byte_size = msg.ByteSize(); + if (byte_size <= kMaxBufferLength) { + gpr_slice slice = gpr_slice_malloc(byte_size); + GPR_ASSERT(GPR_SLICE_END_PTR(slice) == msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); + *bp = grpc_raw_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + return Status::OK; + } else { + GrpcBufferWriter writer(bp); + return msg.SerializeToZeroCopyStream(&writer) + ? Status::OK + : Status(StatusCode::INTERNAL, "Failed to serialize message"); + } } Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h index 4f003c6b7e..5460f4a02c 100644 --- a/src/cpp/server/secure_server_credentials.h +++ b/src/cpp/server/secure_server_credentials.h @@ -46,7 +46,7 @@ namespace grpc { class AuthMetadataProcessorAyncWrapper GRPC_FINAL { public: - static void Destroy(void *wrapper); + static void Destroy(void* wrapper); static void Process(void* wrapper, grpc_auth_context* context, const grpc_metadata* md, size_t num_md, diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index e484a9ea50..efe07f734e 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -405,22 +405,6 @@ describe GRPC::RpcServer do t.join end - it 'should not receive metadata if the client times out', server: true do - service = SlowService.new - @srv.handle(service) - t = Thread.new { @srv.run } - @srv.wait_till_running - req = EchoMsg.new - stub = SlowStub.new(@host, **client_opts) - timeout = 0.1 # too short for SlowService to respond - blk = proc { stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2') } - expect(&blk).to raise_error GRPC::BadStatus - wanted_md = [] - expect(service.received_md).to eq(wanted_md) - @srv.stop - t.join - end - it 'should handle cancellation correctly', server: true do service = SlowService.new @srv.handle(service) |