aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD6
-rw-r--r--Makefile2
-rw-r--r--binding.gyp1
-rw-r--r--build.yaml2
-rw-r--r--config.m41
-rw-r--r--doc/connectivity-semantics-and-api.md2
-rw-r--r--gRPC.podspec3
-rwxr-xr-xgrpc.gemspec2
-rw-r--r--package.xml2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c20
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c1978
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.h41
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_posix.c3
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs24
-rw-r--r--src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs10
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs20
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs39
-rw-r--r--src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs10
-rw-r--r--src/csharp/Grpc.Core/Channel.cs13
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj1
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs53
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs102
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs81
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs36
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCompletion.cs94
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs26
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs16
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs68
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs11
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs32
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs27
-rw-r--r--src/csharp/Grpc.Core/Server.cs39
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs39
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c11
-rw-r--r--src/objective-c/CronetFramework.podspec43
-rw-r--r--src/objective-c/GRPCClient/GRPCCall+Cronet.h55
-rw-r--r--src/objective-c/GRPCClient/GRPCCall+Cronet.m54
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.h6
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.m30
-rw-r--r--src/objective-c/GRPCClient/private/GRPCHost.m13
-rw-r--r--src/objective-c/tests/InteropTests.m16
-rw-r--r--src/objective-c/tests/Podfile1
-rw-r--r--src/python/grpcio/grpc/__init__.py758
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--src/python/grpcio/tests/unit/framework/common/test_control.py26
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rwxr-xr-xtools/jenkins/run_full_performance.sh4
-rwxr-xr-xtools/run_tests/run_tests.py2
-rw-r--r--tools/run_tests/sources_and_headers.json3
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters6
58 files changed, 3512 insertions, 370 deletions
diff --git a/BUILD b/BUILD
index fd03d52e3c..7601a4ac5a 100644
--- a/BUILD
+++ b/BUILD
@@ -178,6 +178,7 @@ cc_library(
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
@@ -321,6 +322,7 @@ cc_library(
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_poll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
@@ -546,6 +548,7 @@ cc_library(
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
@@ -666,6 +669,7 @@ cc_library(
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_poll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
@@ -1358,6 +1362,7 @@ objc_library(
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_poll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
@@ -1562,6 +1567,7 @@ objc_library(
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
diff --git a/Makefile b/Makefile
index 817fcd072d..710f626f9e 100644
--- a/Makefile
+++ b/Makefile
@@ -2486,6 +2486,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
+ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_poll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
@@ -2840,6 +2841,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
+ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_poll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
diff --git a/binding.gyp b/binding.gyp
index 7b18707000..f2271cbcbd 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -581,6 +581,7 @@
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
+ 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
diff --git a/build.yaml b/build.yaml
index 429dbb3351..b76bd122bc 100644
--- a/build.yaml
+++ b/build.yaml
@@ -165,6 +165,7 @@ filegroups:
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/endpoint.h
- src/core/lib/iomgr/endpoint_pair.h
+ - src/core/lib/iomgr/ev_poll_and_epoll_posix.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/exec_ctx.h
@@ -239,6 +240,7 @@ filegroups:
- src/core/lib/iomgr/endpoint.c
- src/core/lib/iomgr/endpoint_pair_posix.c
- src/core/lib/iomgr/endpoint_pair_windows.c
+ - src/core/lib/iomgr/ev_poll_and_epoll_posix.c
- src/core/lib/iomgr/ev_poll_posix.c
- src/core/lib/iomgr/ev_posix.c
- src/core/lib/iomgr/exec_ctx.c
diff --git a/config.m4 b/config.m4
index 29df77ce1d..67c9bef293 100644
--- a/config.m4
+++ b/config.m4
@@ -100,6 +100,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
+ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_poll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
diff --git a/doc/connectivity-semantics-and-api.md b/doc/connectivity-semantics-and-api.md
index 5427900394..cc007eaae3 100644
--- a/doc/connectivity-semantics-and-api.md
+++ b/doc/connectivity-semantics-and-api.md
@@ -101,7 +101,7 @@ corresponding reasons. Empty cells denote disallowed transitions.
<td>Shutdown triggered by application.</td>
</tr>
<tr>
- <th>FATAL_FAILURE</th>
+ <th>SHUTDOWN</th>
<td></td>
<td></td>
<td></td>
diff --git a/gRPC.podspec b/gRPC.podspec
index cc8ba3de94..436cec4ba2 100644
--- a/gRPC.podspec
+++ b/gRPC.podspec
@@ -181,6 +181,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
+ 'src/core/lib/iomgr/ev_poll_and_epoll_posix.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',
@@ -358,6 +359,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
+ 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
@@ -546,6 +548,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
+ 'src/core/lib/iomgr/ev_poll_and_epoll_posix.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index ae7f9b7d2e..55ec9a98c3 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -190,6 +190,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/closure.h )
s.files += %w( src/core/lib/iomgr/endpoint.h )
s.files += %w( src/core/lib/iomgr/endpoint_pair.h )
+ s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.h )
s.files += %w( src/core/lib/iomgr/ev_poll_posix.h )
s.files += %w( src/core/lib/iomgr/ev_posix.h )
s.files += %w( src/core/lib/iomgr/exec_ctx.h )
@@ -337,6 +338,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/endpoint.c )
s.files += %w( src/core/lib/iomgr/endpoint_pair_posix.c )
s.files += %w( src/core/lib/iomgr/endpoint_pair_windows.c )
+ s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.c )
s.files += %w( src/core/lib/iomgr/ev_poll_posix.c )
s.files += %w( src/core/lib/iomgr/ev_posix.c )
s.files += %w( src/core/lib/iomgr/exec_ctx.c )
diff --git a/package.xml b/package.xml
index 507a2a7ed6..c7fbaf5eb2 100644
--- a/package.xml
+++ b/package.xml
@@ -197,6 +197,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/closure.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_and_epoll_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.h" role="src" />
@@ -344,6 +345,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_windows.c" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_and_epoll_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.c" role="src" />
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 5bb085195c..df160aaaf6 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -218,8 +218,11 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
- memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
- gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ if (s->total_read_bytes > 0) {
+ // Only copy if there is non-zero number of bytes
+ memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
+ gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ }
grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
*s->recv_message = (grpc_byte_buffer *)&s->sbs;
}
@@ -347,8 +350,17 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) {
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
}
- cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
- s->remaining_read_bytes);
+ if (s->remaining_read_bytes > 0) {
+ cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
+ s->remaining_read_bytes);
+ } else {
+ // Calling the closing callback directly since this is a 0 byte read
+ // for an empty message.
+ process_recv_message(s, NULL);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ invoke_closing_callback(s);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ }
}
}
break;
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
new file mode 100644
index 0000000000..943c404f91
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -0,0 +1,1978 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* This file will be removed shortly: it's here to keep refactoring
+ * steps simple and auditable.
+ * It's the combination of the old files:
+ * - fd_posix.{h,c}
+ * - pollset_posix.{h,c}
+ * - pullset_multipoller_with_{poll,epoll}.{h,c}
+ * The new version will be split into:
+ * - ev_poll_posix.{h,c}
+ * - ev_epoll_posix.{h,c}
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SOCKET
+
+#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/block_annotate.h"
+
+/*******************************************************************************
+ * FD declarations
+ */
+
+typedef struct grpc_fd_watcher {
+ struct grpc_fd_watcher *next;
+ struct grpc_fd_watcher *prev;
+ grpc_pollset *pollset;
+ grpc_pollset_worker *worker;
+ grpc_fd *fd;
+} grpc_fd_watcher;
+
+struct grpc_fd {
+ int fd;
+ /* refst format:
+ bit0: 1=active/0=orphaned
+ bit1-n: refcount
+ meaning that mostly we ref by two to avoid altering the orphaned bit,
+ and just unref by 1 when we're ready to flag the object as orphaned */
+ gpr_atm refst;
+
+ gpr_mu mu;
+ int shutdown;
+ int closed;
+ int released;
+
+ /* The watcher list.
+
+ The following watcher related fields are protected by watcher_mu.
+
+ An fd_watcher is an ephemeral object created when an fd wants to
+ begin polling, and destroyed after the poll.
+
+ It denotes the fd's interest in whether to read poll or write poll
+ or both or neither on this fd.
+
+ If a watcher is asked to poll for reads or writes, the read_watcher
+ or write_watcher fields are set respectively. A watcher may be asked
+ to poll for both, in which case both fields will be set.
+
+ read_watcher and write_watcher may be NULL if no watcher has been
+ asked to poll for reads or writes.
+
+ If an fd_watcher is not asked to poll for reads or writes, it's added
+ to a linked list of inactive watchers, rooted at inactive_watcher_root.
+ If at a later time there becomes need of a poller to poll, one of
+ the inactive pollers may be kicked out of their poll loops to take
+ that responsibility. */
+ grpc_fd_watcher inactive_watcher_root;
+ grpc_fd_watcher *read_watcher;
+ grpc_fd_watcher *write_watcher;
+
+ grpc_closure *read_closure;
+ grpc_closure *write_closure;
+
+ struct grpc_fd *freelist_next;
+
+ grpc_closure *on_done_closure;
+
+ grpc_iomgr_object iomgr_object;
+
+ /* The pollset that last noticed and notified that the fd is readable */
+ grpc_pollset *read_notifier_pollset;
+};
+
+/* Begin polling on an fd.
+ Registers that the given pollset is interested in this fd - so that if read
+ or writability interest changes, the pollset can be kicked to pick up that
+ new interest.
+ Return value is:
+ (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
+ i.e. a combination of read_mask and write_mask determined by the fd's current
+ interest in said events.
+ Polling strategies that do not need to alter their behavior depending on the
+ fd's current interest (such as epoll) do not need to call this function.
+ MUST NOT be called with a pollset lock taken */
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *rec);
+/* Complete polling previously started with fd_begin_poll
+ MUST NOT be called with a pollset lock taken
+ if got_read or got_write are 1, also does the become_{readable,writable} as
+ appropriate. */
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset);
+
+/* Return 1 if this fd is orphaned, 0 otherwise */
+static bool fd_is_orphaned(grpc_fd *fd);
+
+/* Reference counting for fds */
+/*#define GRPC_FD_REF_COUNT_DEBUG*/
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
+#else
+static void fd_ref(grpc_fd *fd);
+static void fd_unref(grpc_fd *fd);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
+#endif
+
+static void fd_global_init(void);
+static void fd_global_shutdown(void);
+
+#define CLOSURE_NOT_READY ((grpc_closure *)0)
+#define CLOSURE_READY ((grpc_closure *)1)
+
+/*******************************************************************************
+ * pollset declarations
+ */
+
+typedef struct grpc_pollset_vtable grpc_pollset_vtable;
+
+typedef struct grpc_cached_wakeup_fd {
+ grpc_wakeup_fd fd;
+ struct grpc_cached_wakeup_fd *next;
+} grpc_cached_wakeup_fd;
+
+struct grpc_pollset_worker {
+ grpc_cached_wakeup_fd *wakeup_fd;
+ int reevaluate_polling_on_wakeup;
+ int kicked_specifically;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+};
+
+struct grpc_pollset {
+ /* pollsets under posix can mutate representation as fds are added and
+ removed.
+ For example, we may choose a poll() based implementation on linux for
+ few fds, and an epoll() based implementation for many fds */
+ const grpc_pollset_vtable *vtable;
+ gpr_mu mu;
+ grpc_pollset_worker root_worker;
+ int in_flight_cbs;
+ int shutting_down;
+ int called_shutdown;
+ int kicked_without_pollers;
+ grpc_closure *shutdown_done;
+ grpc_closure_list idle_jobs;
+ union {
+ int fd;
+ void *ptr;
+ } data;
+ /* Local cache of eventfds for workers */
+ grpc_cached_wakeup_fd *local_wakeup_cache;
+};
+
+struct grpc_pollset_vtable {
+ void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ struct grpc_fd *fd, int and_unlock_pollset);
+ void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now);
+ void (*finish_shutdown)(grpc_pollset *pollset);
+ void (*destroy)(grpc_pollset *pollset);
+};
+
+/* Add an fd to a pollset */
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ struct grpc_fd *fd);
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd);
+
+/* Convert a timespec to milliseconds:
+ - very small or negative poll times are clamped to zero to do a
+ non-blocking poll (which becomes spin polling)
+ - other small values are rounded up to one millisecond
+ - longer than a millisecond polls are rounded up to the next nearest
+ millisecond to avoid spinning
+ - infinite timeouts are converted to -1 */
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now);
+
+/* Allow kick to wakeup the currently polling worker */
+#define GRPC_POLLSET_CAN_KICK_SELF 1
+/* Force the wakee to repoll when awoken */
+#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
+/* As per pollset_kick, with an extended set of flags (defined above)
+ -- mostly for fd_posix's use. */
+static void pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags);
+
+/* turn a pollset into a multipoller: platform specific */
+typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ struct grpc_fd **fds,
+ size_t fd_count);
+static platform_become_multipoller_type platform_become_multipoller;
+
+/* Return 1 if the pollset has active threads in pollset_work (pollset must
+ * be locked) */
+static int pollset_has_workers(grpc_pollset *pollset);
+
+static void remove_fd_from_all_epoll_sets(int fd);
+
+/*******************************************************************************
+ * pollset_set definitions
+ */
+
+struct grpc_pollset_set {
+ gpr_mu mu;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+};
+
+/*******************************************************************************
+ * fd_posix.c
+ */
+
+/* We need to keep a freelist not because of any concerns of malloc performance
+ * but instead so that implementations with multiple threads in (for example)
+ * epoll_wait deal with the race between pollset removal and incoming poll
+ * notifications.
+ *
+ * The problem is that the poller ultimately holds a reference to this
+ * object, so it is very difficult to know when is safe to free it, at least
+ * without some expensive synchronization.
+ *
+ * If we keep the object freelisted, in the worst case losing this race just
+ * becomes a spurious read notification on a reused fd.
+ */
+/* TODO(klempner): We could use some form of polling generation count to know
+ * when these are safe to free. */
+/* TODO(klempner): Consider disabling freelisting if we don't have multiple
+ * threads in poll on the same fd */
+/* TODO(klempner): Batch these allocations to reduce fragmentation */
+static grpc_fd *fd_freelist = NULL;
+static gpr_mu fd_freelist_mu;
+
+static void freelist_fd(grpc_fd *fd) {
+ gpr_mu_lock(&fd_freelist_mu);
+ fd->freelist_next = fd_freelist;
+ fd_freelist = fd;
+ grpc_iomgr_unregister_object(&fd->iomgr_object);
+ gpr_mu_unlock(&fd_freelist_mu);
+}
+
+static grpc_fd *alloc_fd(int fd) {
+ grpc_fd *r = NULL;
+ gpr_mu_lock(&fd_freelist_mu);
+ if (fd_freelist != NULL) {
+ r = fd_freelist;
+ fd_freelist = fd_freelist->freelist_next;
+ }
+ gpr_mu_unlock(&fd_freelist_mu);
+ if (r == NULL) {
+ r = gpr_malloc(sizeof(grpc_fd));
+ gpr_mu_init(&r->mu);
+ }
+
+ gpr_mu_lock(&r->mu);
+ gpr_atm_rel_store(&r->refst, 1);
+ r->shutdown = 0;
+ r->read_closure = CLOSURE_NOT_READY;
+ r->write_closure = CLOSURE_NOT_READY;
+ r->fd = fd;
+ r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
+ &r->inactive_watcher_root;
+ r->freelist_next = NULL;
+ r->read_watcher = r->write_watcher = NULL;
+ r->on_done_closure = NULL;
+ r->closed = 0;
+ r->released = 0;
+ r->read_notifier_pollset = NULL;
+ gpr_mu_unlock(&r->mu);
+ return r;
+}
+
+static void destroy(grpc_fd *fd) {
+ gpr_mu_destroy(&fd->mu);
+ gpr_free(fd);
+}
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
+static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
+#else
+#define REF_BY(fd, n, reason) ref_by(fd, n)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n)
+static void ref_by(grpc_fd *fd, int n) {
+#endif
+ GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
+}
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_atm old;
+ gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
+#else
+static void unref_by(grpc_fd *fd, int n) {
+ gpr_atm old;
+#endif
+ old = gpr_atm_full_fetch_add(&fd->refst, -n);
+ if (old == n) {
+ freelist_fd(fd);
+ } else {
+ GPR_ASSERT(old > n);
+ }
+}
+
+static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
+
+static void fd_global_shutdown(void) {
+ gpr_mu_lock(&fd_freelist_mu);
+ gpr_mu_unlock(&fd_freelist_mu);
+ while (fd_freelist != NULL) {
+ grpc_fd *fd = fd_freelist;
+ fd_freelist = fd_freelist->freelist_next;
+ destroy(fd);
+ }
+ gpr_mu_destroy(&fd_freelist_mu);
+}
+
+static grpc_fd *fd_create(int fd, const char *name) {
+ grpc_fd *r = alloc_fd(fd);
+ char *name2;
+ gpr_asprintf(&name2, "%s fd=%d", name, fd);
+ grpc_iomgr_register_object(&r->iomgr_object, name2);
+ gpr_free(name2);
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+ gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
+#endif
+ return r;
+}
+
+static bool fd_is_orphaned(grpc_fd *fd) {
+ return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
+}
+
+static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(&watcher->pollset->mu);
+ GPR_ASSERT(watcher->worker);
+ pollset_kick_ext(watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ gpr_mu_unlock(&watcher->pollset->mu);
+}
+
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
+ pollset_kick_locked(fd->inactive_watcher_root.next);
+ } else if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ } else if (fd->write_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static void wake_all_watchers_locked(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
+ for (watcher = fd->inactive_watcher_root.next;
+ watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
+ pollset_kick_locked(watcher);
+ }
+ if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ }
+ if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static int has_watchers(grpc_fd *fd) {
+ return fd->read_watcher != NULL || fd->write_watcher != NULL ||
+ fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
+}
+
+static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ fd->closed = 1;
+ if (!fd->released) {
+ close(fd->fd);
+ } else {
+ remove_fd_from_all_epoll_sets(fd->fd);
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
+}
+
+static int fd_wrapped_fd(grpc_fd *fd) {
+ if (fd->released || fd->closed) {
+ return -1;
+ } else {
+ return fd->fd;
+ }
+}
+
+static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *on_done, int *release_fd,
+ const char *reason) {
+ fd->on_done_closure = on_done;
+ fd->released = release_fd != NULL;
+ if (!fd->released) {
+ shutdown(fd->fd, SHUT_RDWR);
+ } else {
+ *release_fd = fd->fd;
+ }
+ gpr_mu_lock(&fd->mu);
+ REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
+ if (!has_watchers(fd)) {
+ close_fd_locked(exec_ctx, fd);
+ } else {
+ wake_all_watchers_locked(fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+ UNREF_BY(fd, 2, reason); /* drop the reference */
+}
+
+/* increment refcount by two to avoid changing the orphan bit */
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ ref_by(fd, 2, reason, file, line);
+}
+
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ unref_by(fd, 2, reason, file, line);
+}
+#else
+static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
+
+static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
+#endif
+
+static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st, grpc_closure *closure) {
+ if (*st == CLOSURE_NOT_READY) {
+ /* not ready ==> switch to a waiting state by setting the closure */
+ *st = closure;
+ } else if (*st == CLOSURE_READY) {
+ /* already ready ==> queue the closure to run immediately */
+ *st = CLOSURE_NOT_READY;
+ grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
+ maybe_wake_one_watcher_locked(fd);
+ } else {
+ /* upcallptr was set to a different closure. This is an error! */
+ gpr_log(GPR_ERROR,
+ "User called a notify_on function with a previous callback still "
+ "pending");
+ abort();
+ }
+}
+
+/* returns 1 if state becomes not ready */
+static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st) {
+ if (*st == CLOSURE_READY) {
+ /* duplicate ready ==> ignore */
+ return 0;
+ } else if (*st == CLOSURE_NOT_READY) {
+ /* not ready, and not waiting ==> flag ready */
+ *st = CLOSURE_READY;
+ return 0;
+ } else {
+ /* waiting ==> queue closure */
+ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
+ *st = CLOSURE_NOT_READY;
+ return 1;
+ }
+}
+
+static void set_read_notifier_pollset_locked(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
+ fd->read_notifier_pollset = read_notifier_pollset;
+}
+
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ GPR_ASSERT(!fd->shutdown);
+ fd->shutdown = 1;
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+/* Return the read-notifier pollset */
+static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ grpc_pollset *notifier = NULL;
+
+ gpr_mu_lock(&fd->mu);
+ notifier = fd->read_notifier_pollset;
+ gpr_mu_unlock(&fd->mu);
+
+ return notifier;
+}
+
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *watcher) {
+ uint32_t mask = 0;
+ grpc_closure *cur;
+ int requested;
+ /* keep track of pollers that have requested our events, in case they change
+ */
+ GRPC_FD_REF(fd, "poll");
+
+ gpr_mu_lock(&fd->mu);
+
+ /* if we are shutdown, then don't add to the watcher set */
+ if (fd->shutdown) {
+ watcher->fd = NULL;
+ watcher->pollset = NULL;
+ watcher->worker = NULL;
+ gpr_mu_unlock(&fd->mu);
+ GRPC_FD_UNREF(fd, "poll");
+ return 0;
+ }
+
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ cur = fd->read_closure;
+ requested = cur != CLOSURE_READY;
+ if (read_mask && fd->read_watcher == NULL && requested) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so
+ */
+ cur = fd->write_closure;
+ requested = cur != CLOSURE_READY;
+ if (write_mask && fd->write_watcher == NULL && requested) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0 && worker != NULL) {
+ watcher->next = &fd->inactive_watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
+ watcher->pollset = pollset;
+ watcher->worker = worker;
+ watcher->fd = fd;
+ gpr_mu_unlock(&fd->mu);
+
+ return mask;
+}
+
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
+
+ if (fd == NULL) {
+ return;
+ }
+
+ gpr_mu_lock(&fd->mu);
+
+ if (watcher == fd->read_watcher) {
+ /* remove read watcher, kick if we still need a read */
+ was_polling = 1;
+ if (!got_read) {
+ kick = 1;
+ }
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ /* remove write watcher, kick if we still need a write */
+ was_polling = 1;
+ if (!got_write) {
+ kick = 1;
+ }
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling && watcher->worker != NULL) {
+ /* remove from inactive list */
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (got_read) {
+ if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
+ kick = 1;
+ }
+
+ if (read_notifier_pollset != NULL) {
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
+ }
+ if (got_write) {
+ if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
+ kick = 1;
+ }
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
+ close_fd_locked(exec_ctx, fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+
+ GRPC_FD_UNREF(fd, "poll");
+}
+
+/*******************************************************************************
+ * pollset_posix.c
+ */
+
+GPR_TLS_DECL(g_current_thread_poller);
+GPR_TLS_DECL(g_current_thread_worker);
+
+/** The alarm system needs to be able to wakeup 'some poller' sometimes
+ * (specifically when a new alarm needs to be triggered earlier than the next
+ * alarm 'epoch').
+ * This wakeup_fd gives us something to alert on when such a case occurs. */
+grpc_wakeup_fd grpc_global_wakeup_fd;
+
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+static int pollset_has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
+}
+
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (pollset_has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ } else {
+ return NULL;
+ }
+}
+
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags) {
+ GPR_TIMER_BEGIN("pollset_kick_ext", 0);
+
+ /* pollset->mu already held */
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ p->kicked_without_pollers = 1;
+ GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
+ } else if (gpr_tls_get(&g_current_thread_worker) !=
+ (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("different_thread_worker", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ specific_worker->kicked_specifically = 1;
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
+ GPR_TIMER_MARK("kick_yoself", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ specific_worker->kicked_specifically = 1;
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ GPR_TIMER_MARK("kick_anonymous", 0);
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
+ /* Prefer not to kick self. Push the worker to the end of the list and
+ * pop the one from front */
+ GPR_TIMER_MARK("kick_anonymous_not_self", 0);
+ push_back_worker(p, specific_worker);
+ specific_worker = pop_front_worker(p);
+ /* If there was only one worker on the pollset, we would get the same
+ * worker we pushed (the one set on current thread local) back. If so,
+ * kick it only if GRPC_POLLSET_CAN_KICK_SELF flag is set */
+ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
+ gpr_tls_get(&g_current_thread_worker) ==
+ (intptr_t)specific_worker) {
+ push_back_worker(p, specific_worker);
+ specific_worker = NULL;
+ }
+ }
+ if (specific_worker != NULL) {
+ GPR_TIMER_MARK("finally_kick", 0);
+ push_back_worker(p, specific_worker);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ } else {
+ GPR_TIMER_MARK("kicked_no_pollers", 0);
+ p->kicked_without_pollers = 1;
+ }
+ }
+
+ GPR_TIMER_END("pollset_kick_ext", 0);
+}
+
+static void pollset_kick(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker) {
+ pollset_kick_ext(p, specific_worker, 0);
+}
+
+/* global state management */
+
+static void pollset_global_init(void) {
+ gpr_tls_init(&g_current_thread_poller);
+ gpr_tls_init(&g_current_thread_worker);
+ grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+}
+
+static void pollset_global_shutdown(void) {
+ grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ gpr_tls_destroy(&g_current_thread_poller);
+ gpr_tls_destroy(&g_current_thread_worker);
+}
+
+static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
+
+/* main interface */
+
+static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
+
+static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ gpr_mu_init(&pollset->mu);
+ *mu = &pollset->mu;
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
+ pollset->in_flight_cbs = 0;
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+ pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
+ pollset->local_wakeup_cache = NULL;
+ pollset->kicked_without_pollers = 0;
+ become_basic_pollset(pollset, NULL);
+}
+
+static void pollset_destroy(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ pollset->vtable->destroy(pollset);
+ while (pollset->local_wakeup_cache) {
+ grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
+ grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
+ gpr_free(pollset->local_wakeup_cache);
+ pollset->local_wakeup_cache = next;
+ }
+ gpr_mu_destroy(&pollset->mu);
+}
+
+static void pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ pollset->vtable->destroy(pollset);
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+ become_basic_pollset(pollset, NULL);
+}
+
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
+ gpr_mu_lock(&pollset->mu);
+ pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
+/* the following (enabled only in debug) will reacquire and then release
+ our lock - meaning that if the unlocking flag passed to add_fd above is
+ not respected, the code will deadlock (in a way that we have a chance of
+ debugging) */
+#ifndef NDEBUG
+ gpr_mu_lock(&pollset->mu);
+ gpr_mu_unlock(&pollset->mu);
+#endif
+}
+
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+ GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
+ pollset->vtable->finish_shutdown(pollset);
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
+}
+
+static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl, gpr_timespec now,
+ gpr_timespec deadline) {
+ grpc_pollset_worker worker;
+ *worker_hdl = &worker;
+
+ /* pollset->mu already held */
+ int added_worker = 0;
+ int locked = 1;
+ int queued_work = 0;
+ int keep_polling = 0;
+ GPR_TIMER_BEGIN("pollset_work", 0);
+ /* this must happen before we (potentially) drop pollset->mu */
+ worker.next = worker.prev = NULL;
+ worker.reevaluate_polling_on_wakeup = 0;
+ if (pollset->local_wakeup_cache != NULL) {
+ worker.wakeup_fd = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd->next;
+ } else {
+ worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
+ grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ }
+ worker.kicked_specifically = 0;
+ /* If there's work waiting for the pollset to be idle, and the
+ pollset is idle, then do that work */
+ if (!pollset_has_workers(pollset) &&
+ !grpc_closure_list_empty(pollset->idle_jobs)) {
+ GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ goto done;
+ }
+ /* If we're shutting down then we don't execute any extended work */
+ if (pollset->shutting_down) {
+ GPR_TIMER_MARK("pollset_work.shutting_down", 0);
+ goto done;
+ }
+ /* Give do_promote priority so we don't starve it out */
+ if (pollset->in_flight_cbs) {
+ GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
+ goto done;
+ }
+ /* Start polling, and keep doing so while we're being asked to
+ re-evaluate our pollers (this allows poll() based pollers to
+ ensure they don't miss wakeups) */
+ keep_polling = 1;
+ while (keep_polling) {
+ keep_polling = 0;
+ if (!pollset->kicked_without_pollers) {
+ if (!added_worker) {
+ push_front_worker(pollset, &worker);
+ added_worker = 1;
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
+ }
+ gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
+ GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
+ pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker,
+ deadline, now);
+ GPR_TIMER_END("maybe_work_and_unlock", 0);
+ locked = 0;
+ gpr_tls_set(&g_current_thread_poller, 0);
+ } else {
+ GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
+ pollset->kicked_without_pollers = 0;
+ }
+ /* Finished execution - start cleaning up.
+ Note that we may arrive here from outside the enclosing while() loop.
+ In that case we won't loop though as we haven't added worker to the
+ worker list, which means nobody could ask us to re-evaluate polling). */
+ done:
+ if (!locked) {
+ queued_work |= grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
+ /* If we're forced to re-evaluate polling (via pollset_kick with
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
+ a loop */
+ if (worker.reevaluate_polling_on_wakeup) {
+ worker.reevaluate_polling_on_wakeup = 0;
+ pollset->kicked_without_pollers = 0;
+ if (queued_work || worker.kicked_specifically) {
+ /* If there's queued work on the list, then set the deadline to be
+ immediate so we get back out of the polling loop quickly */
+ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ }
+ keep_polling = 1;
+ }
+ }
+ if (added_worker) {
+ remove_worker(pollset, &worker);
+ gpr_tls_set(&g_current_thread_worker, 0);
+ }
+ /* release wakeup fd to the local pool */
+ worker.wakeup_fd->next = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd;
+ /* check shutdown conditions */
+ if (pollset->shutting_down) {
+ if (pollset_has_workers(pollset)) {
+ pollset_kick(pollset, NULL);
+ } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
+ pollset->called_shutdown = 1;
+ gpr_mu_unlock(&pollset->mu);
+ finish_shutdown(exec_ctx, pollset);
+ grpc_exec_ctx_flush(exec_ctx);
+ /* Continuing to access pollset here is safe -- it is the caller's
+ * responsibility to not destroy when it has outstanding calls to
+ * pollset_work.
+ * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
+ gpr_mu_lock(&pollset->mu);
+ } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ gpr_mu_unlock(&pollset->mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ }
+ }
+ *worker_hdl = NULL;
+ GPR_TIMER_END("pollset_work", 0);
+}
+
+static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
+ GPR_ASSERT(!pollset->shutting_down);
+ pollset->shutting_down = 1;
+ pollset->shutdown_done = closure;
+ pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+ if (!pollset_has_workers(pollset)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ }
+ if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
+ !pollset_has_workers(pollset)) {
+ pollset->called_shutdown = 1;
+ finish_shutdown(exec_ctx, pollset);
+ }
+}
+
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now) {
+ gpr_timespec timeout;
+ static const int64_t max_spin_polling_us = 10;
+ if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
+ return -1;
+ }
+ if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
+ max_spin_polling_us,
+ GPR_TIMESPAN))) <= 0) {
+ return 0;
+ }
+ timeout = gpr_time_sub(deadline, now);
+ return gpr_time_to_millis(gpr_time_add(
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
+}
+
+/*
+ * basic_pollset - a vtable that provides polling for zero or one file
+ * descriptor via poll()
+ */
+
+typedef struct grpc_unary_promote_args {
+ const grpc_pollset_vtable *original_vtable;
+ grpc_pollset *pollset;
+ grpc_fd *fd;
+ grpc_closure promotion_closure;
+} grpc_unary_promote_args;
+
+static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
+ bool success) {
+ grpc_unary_promote_args *up_args = args;
+ const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
+ grpc_pollset *pollset = up_args->pollset;
+ grpc_fd *fd = up_args->fd;
+
+ /*
+ * This is quite tricky. There are a number of cases to keep in mind here:
+ * 1. fd may have been orphaned
+ * 2. The pollset may no longer be a unary poller (and we can't let case #1
+ * leak to other pollset types!)
+ * 3. pollset's fd (which may have changed) may have been orphaned
+ * 4. The pollset may be shutting down.
+ */
+
+ gpr_mu_lock(&pollset->mu);
+ /* First we need to ensure that nobody is polling concurrently */
+ GPR_ASSERT(!pollset_has_workers(pollset));
+
+ gpr_free(up_args);
+ /* At this point the pollset may no longer be a unary poller. In that case
+ * we should just call the right add function and be done. */
+ /* TODO(klempner): If we're not careful this could cause infinite recursion.
+ * That's not a problem for now because empty_pollset has a trivial poller
+ * and we don't have any mechanism to unbecome multipoller. */
+ pollset->in_flight_cbs--;
+ if (pollset->shutting_down) {
+ /* We don't care about this pollset anymore. */
+ if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
+ pollset->called_shutdown = 1;
+ finish_shutdown(exec_ctx, pollset);
+ }
+ } else if (fd_is_orphaned(fd)) {
+ /* Don't try to add it to anything, we'll drop our ref on it below */
+ } else if (pollset->vtable != original_vtable) {
+ pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
+ } else if (fd != pollset->data.ptr) {
+ grpc_fd *fds[2];
+ fds[0] = pollset->data.ptr;
+ fds[1] = fd;
+
+ if (fds[0] && !fd_is_orphaned(fds[0])) {
+ platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds));
+ GRPC_FD_UNREF(fds[0], "basicpoll");
+ } else {
+ /* old fd is orphaned and we haven't cleaned it up until now, so remain a
+ * unary poller */
+ /* Note that it is possible that fds[1] is also orphaned at this point.
+ * That's okay, we'll correct it at the next add or poll. */
+ if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll");
+ pollset->data.ptr = fd;
+ GRPC_FD_REF(fd, "basicpoll");
+ }
+ }
+
+ gpr_mu_unlock(&pollset->mu);
+
+ /* Matching ref in basic_pollset_add_fd */
+ GRPC_FD_UNREF(fd, "basicpoll_add");
+}
+
+static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd, int and_unlock_pollset) {
+ grpc_unary_promote_args *up_args;
+ GPR_ASSERT(fd);
+ if (fd == pollset->data.ptr) goto exit;
+
+ if (!pollset_has_workers(pollset)) {
+ /* Fast path -- no in flight cbs */
+ /* TODO(klempner): Comment this out and fix any test failures or establish
+ * they are due to timing issues */
+ grpc_fd *fds[2];
+ fds[0] = pollset->data.ptr;
+ fds[1] = fd;
+
+ if (fds[0] == NULL) {
+ pollset->data.ptr = fd;
+ GRPC_FD_REF(fd, "basicpoll");
+ } else if (!fd_is_orphaned(fds[0])) {
+ platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds));
+ GRPC_FD_UNREF(fds[0], "basicpoll");
+ } else {
+ /* old fd is orphaned and we haven't cleaned it up until now, so remain a
+ * unary poller */
+ GRPC_FD_UNREF(fds[0], "basicpoll");
+ pollset->data.ptr = fd;
+ GRPC_FD_REF(fd, "basicpoll");
+ }
+ goto exit;
+ }
+
+ /* Now we need to promote. This needs to happen when we're not polling. Since
+ * this may be called from poll, the wait needs to happen asynchronously. */
+ GRPC_FD_REF(fd, "basicpoll_add");
+ pollset->in_flight_cbs++;
+ up_args = gpr_malloc(sizeof(*up_args));
+ up_args->fd = fd;
+ up_args->original_vtable = pollset->vtable;
+ up_args->pollset = pollset;
+ up_args->promotion_closure.cb = basic_do_promote;
+ up_args->promotion_closure.cb_arg = up_args;
+
+ grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
+ pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+
+exit:
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
+}
+
+static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ gpr_timespec deadline,
+ gpr_timespec now) {
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
+ struct pollfd pfd[3];
+ grpc_fd *fd;
+ grpc_fd_watcher fd_watcher;
+ int timeout;
+ int r;
+ nfds_t nfds;
+
+ fd = pollset->data.ptr;
+ if (fd && fd_is_orphaned(fd)) {
+ GRPC_FD_UNREF(fd, "basicpoll");
+ fd = pollset->data.ptr = NULL;
+ }
+ timeout = poll_deadline_to_millis_timeout(deadline, now);
+ pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfd[0].events = POLLIN;
+ pfd[0].revents = 0;
+ pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
+ pfd[1].events = POLLIN;
+ pfd[1].revents = 0;
+ nfds = 2;
+ if (fd) {
+ pfd[2].fd = fd->fd;
+ pfd[2].revents = 0;
+ GRPC_FD_REF(fd, "basicpoll_begin");
+ gpr_mu_unlock(&pollset->mu);
+ pfd[2].events =
+ (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher);
+ if (pfd[2].events != 0) {
+ nfds++;
+ }
+ } else {
+ gpr_mu_unlock(&pollset->mu);
+ }
+
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
+ /* poll fd count (argument 2) is shortened by one if we have no events
+ to poll on - such that it only includes the kicker */
+ GPR_TIMER_BEGIN("poll", 0);
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ r = grpc_poll_function(pfd, nfds, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GPR_TIMER_END("poll", 0);
+
+ if (r < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ if (fd) {
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
+ }
+ } else if (r == 0) {
+ if (fd) {
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
+ }
+ } else {
+ if (pfd[0].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfd[1].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
+ }
+ if (nfds > 2) {
+ fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
+ pfd[2].revents & POLLOUT_CHECK, pollset);
+ } else if (fd) {
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
+ }
+ }
+
+ if (fd) {
+ GRPC_FD_UNREF(fd, "basicpoll_begin");
+ }
+}
+
+static void basic_pollset_destroy(grpc_pollset *pollset) {
+ if (pollset->data.ptr != NULL) {
+ GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
+ pollset->data.ptr = NULL;
+ }
+}
+
+static const grpc_pollset_vtable basic_pollset = {
+ basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock,
+ basic_pollset_destroy, basic_pollset_destroy};
+
+static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
+ pollset->vtable = &basic_pollset;
+ pollset->data.ptr = fd_or_null;
+ if (fd_or_null != NULL) {
+ GRPC_FD_REF(fd_or_null, "basicpoll");
+ }
+}
+
+/*******************************************************************************
+ * pollset_multipoller_with_poll_posix.c
+ */
+
+#ifndef GPR_LINUX_MULTIPOLL_WITH_EPOLL
+
+typedef struct {
+ /* all polled fds */
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+ /* fds that have been removed from the pollset explicitly */
+ size_t del_count;
+ size_t del_capacity;
+ grpc_fd **dels;
+} poll_hdr;
+
+static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_fd *fd,
+ int and_unlock_pollset) {
+ size_t i;
+ poll_hdr *h = pollset->data.ptr;
+ /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
+ for (i = 0; i < h->fd_count; i++) {
+ if (h->fds[i] == fd) goto exit;
+ }
+ if (h->fd_count == h->fd_capacity) {
+ h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
+ h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity);
+ }
+ h->fds[h->fd_count++] = fd;
+ GRPC_FD_REF(fd, "multipoller");
+exit:
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
+}
+
+static void multipoll_with_poll_pollset_maybe_work_and_unlock(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now) {
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
+ int timeout;
+ int r;
+ size_t i, j, fd_count;
+ nfds_t pfd_count;
+ poll_hdr *h;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
+
+ h = pollset->data.ptr;
+ timeout = poll_deadline_to_millis_timeout(deadline, now);
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline case */
+ pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2));
+ watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2));
+ fd_count = 0;
+ pfd_count = 2;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+ for (i = 0; i < h->fd_count; i++) {
+ int remove = fd_is_orphaned(h->fds[i]);
+ for (j = 0; !remove && j < h->del_count; j++) {
+ if (h->fds[i] == h->dels[j]) remove = 1;
+ }
+ if (remove) {
+ GRPC_FD_UNREF(h->fds[i], "multipoller");
+ } else {
+ h->fds[fd_count++] = h->fds[i];
+ watchers[pfd_count].fd = h->fds[i];
+ GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
+ pfds[pfd_count].fd = h->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
+ }
+ }
+ for (j = 0; j < h->del_count; j++) {
+ GRPC_FD_UNREF(h->dels[j], "multipoller_del");
+ }
+ h->del_count = 0;
+ h->fd_count = fd_count;
+ gpr_mu_unlock(&pollset->mu);
+
+ for (i = 2; i < pfd_count; i++) {
+ grpc_fd *fd = watchers[i].fd;
+ pfds[i].events = (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT,
+ &watchers[i]);
+ GRPC_FD_UNREF(fd, "multipoller_start");
+ }
+
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ r = grpc_poll_function(pfds, pfd_count, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
+ if (r < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ }
+ } else if (r == 0) {
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ }
+ } else {
+ if (pfds[0].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfds[1].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
+ }
+ for (i = 2; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ continue;
+ }
+ fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
+ pfds[i].revents & POLLOUT_CHECK, pollset);
+ }
+ }
+
+ gpr_free(pfds);
+ gpr_free(watchers);
+}
+
+static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
+ size_t i;
+ poll_hdr *h = pollset->data.ptr;
+ for (i = 0; i < h->fd_count; i++) {
+ GRPC_FD_UNREF(h->fds[i], "multipoller");
+ }
+ for (i = 0; i < h->del_count; i++) {
+ GRPC_FD_UNREF(h->dels[i], "multipoller_del");
+ }
+ h->fd_count = 0;
+ h->del_count = 0;
+}
+
+static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
+ poll_hdr *h = pollset->data.ptr;
+ multipoll_with_poll_pollset_finish_shutdown(pollset);
+ gpr_free(h->fds);
+ gpr_free(h->dels);
+ gpr_free(h);
+}
+
+static const grpc_pollset_vtable multipoll_with_poll_pollset = {
+ multipoll_with_poll_pollset_add_fd,
+ multipoll_with_poll_pollset_maybe_work_and_unlock,
+ multipoll_with_poll_pollset_finish_shutdown,
+ multipoll_with_poll_pollset_destroy};
+
+static void poll_become_multipoller(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset, grpc_fd **fds,
+ size_t nfds) {
+ size_t i;
+ poll_hdr *h = gpr_malloc(sizeof(poll_hdr));
+ pollset->vtable = &multipoll_with_poll_pollset;
+ pollset->data.ptr = h;
+ h->fd_count = nfds;
+ h->fd_capacity = nfds;
+ h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
+ h->del_count = 0;
+ h->del_capacity = 0;
+ h->dels = NULL;
+ for (i = 0; i < nfds; i++) {
+ h->fds[i] = fds[i];
+ GRPC_FD_REF(fds[i], "multipoller");
+ }
+}
+
+#endif /* !GPR_LINUX_MULTIPOLL_WITH_EPOLL */
+
+/*******************************************************************************
+ * pollset_multipoller_with_epoll_posix.c
+ */
+
+#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
+
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/block_annotate.h"
+
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st,
+ grpc_pollset *read_notifier_pollset) {
+ /* only one set_ready can be active at once (but there may be a racing
+ notify_on) */
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(exec_ctx, fd, st);
+
+ /* A non-NULL read_notifier_pollset means that the fd is readable. */
+ if (read_notifier_pollset != NULL) {
+ /* Note: Since the fd might be a part of multiple pollsets, this might be
+ * called multiple times (for each time the fd becomes readable) and it is
+ * okay to set the fd's read-notifier pollset to anyone of these pollsets */
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
+
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_pollset *notifier_pollset) {
+ set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset);
+}
+
+static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ set_ready(exec_ctx, fd, &fd->write_closure, NULL);
+}
+
+struct epoll_fd_list {
+ int *epoll_fds;
+ size_t count;
+ size_t capacity;
+};
+
+static struct epoll_fd_list epoll_fd_global_list;
+static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT;
+static gpr_mu epoll_fd_list_mu;
+
+static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); }
+
+static void add_epoll_fd_to_global_list(int epoll_fd) {
+ gpr_once_init(&init_epoll_fd_list_mu, init_mu);
+
+ gpr_mu_lock(&epoll_fd_list_mu);
+ if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) {
+ epoll_fd_global_list.capacity =
+ GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2);
+ epoll_fd_global_list.epoll_fds =
+ gpr_realloc(epoll_fd_global_list.epoll_fds,
+ epoll_fd_global_list.capacity * sizeof(int));
+ }
+ epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd;
+ gpr_mu_unlock(&epoll_fd_list_mu);
+}
+
+static void remove_epoll_fd_from_global_list(int epoll_fd) {
+ gpr_mu_lock(&epoll_fd_list_mu);
+ GPR_ASSERT(epoll_fd_global_list.count > 0);
+ for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
+ if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) {
+ epoll_fd_global_list.epoll_fds[i] =
+ epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)];
+ break;
+ }
+ }
+ gpr_mu_unlock(&epoll_fd_list_mu);
+}
+
+static void remove_fd_from_all_epoll_sets(int fd) {
+ int err;
+ gpr_once_init(&init_epoll_fd_list_mu, init_mu);
+ gpr_mu_lock(&epoll_fd_list_mu);
+ if (epoll_fd_global_list.count == 0) {
+ gpr_mu_unlock(&epoll_fd_list_mu);
+ return;
+ }
+ for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
+ err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL);
+ if (err < 0 && errno != ENOENT) {
+ gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd,
+ strerror(errno));
+ }
+ }
+ gpr_mu_unlock(&epoll_fd_list_mu);
+}
+
+typedef struct {
+ grpc_pollset *pollset;
+ grpc_fd *fd;
+ grpc_closure closure;
+} delayed_add;
+
+typedef struct { int epoll_fd; } epoll_hdr;
+
+static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
+ epoll_hdr *h = pollset->data.ptr;
+ struct epoll_event ev;
+ int err;
+ grpc_fd_watcher watcher;
+
+ /* We pretend to be polling whilst adding an fd to keep the fd from being
+ closed during the add. This may result in a spurious wakeup being assigned
+ to this pollset whilst adding, but that should be benign. */
+ GPR_ASSERT(fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
+ if (watcher.fd != NULL) {
+ ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
+ ev.data.ptr = fd;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (err < 0) {
+ /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
+ if (errno != EEXIST) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+ strerror(errno));
+ }
+ }
+ }
+ fd_end_poll(exec_ctx, &watcher, 0, 0, NULL);
+}
+
+static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
+ bool iomgr_status) {
+ delayed_add *da = arg;
+
+ if (!fd_is_orphaned(da->fd)) {
+ finally_add_fd(exec_ctx, da->pollset, da->fd);
+ }
+
+ gpr_mu_lock(&da->pollset->mu);
+ da->pollset->in_flight_cbs--;
+ if (da->pollset->shutting_down) {
+ /* We don't care about this pollset anymore. */
+ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
+ da->pollset->called_shutdown = 1;
+ grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL);
+ }
+ }
+ gpr_mu_unlock(&da->pollset->mu);
+
+ GRPC_FD_UNREF(da->fd, "delayed_add");
+
+ gpr_free(da);
+}
+
+static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_fd *fd,
+ int and_unlock_pollset) {
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ finally_add_fd(exec_ctx, pollset, fd);
+ } else {
+ delayed_add *da = gpr_malloc(sizeof(*da));
+ da->pollset = pollset;
+ da->fd = fd;
+ GRPC_FD_REF(fd, "delayed_add");
+ grpc_closure_init(&da->closure, perform_delayed_add, da);
+ pollset->in_flight_cbs++;
+ grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL);
+ }
+}
+
+/* TODO(klempner): We probably want to turn this down a bit */
+#define GRPC_EPOLL_MAX_EVENTS 1000
+
+static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now) {
+ struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
+ int ep_rv;
+ int poll_rv;
+ epoll_hdr *h = pollset->data.ptr;
+ int timeout_ms;
+ struct pollfd pfds[2];
+
+ /* If you want to ignore epoll's ability to sanely handle parallel pollers,
+ * for a more apples-to-apples performance comparison with poll, add a
+ * if (pollset->counter != 0) { return 0; }
+ * here.
+ */
+
+ gpr_mu_unlock(&pollset->mu);
+
+ timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
+
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = h->epoll_fd;
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
+ GPR_TIMER_BEGIN("poll", 0);
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GPR_TIMER_END("poll", 0);
+
+ if (poll_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ } else if (poll_rv == 0) {
+ /* do nothing */
+ } else {
+ if (pfds[0].revents) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
+ }
+ if (pfds[1].revents) {
+ do {
+ /* The following epoll_wait never blocks; it has a timeout of 0 */
+ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
+ }
+ } else {
+ int i;
+ for (i = 0; i < ep_rv; ++i) {
+ grpc_fd *fd = ep_ev[i].data.ptr;
+ /* TODO(klempner): We might want to consider making err and pri
+ * separate events */
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write_ev = ep_ev[i].events & EPOLLOUT;
+ if (fd == NULL) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ } else {
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd, pollset);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
+ }
+ }
+ }
+ }
+ } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+ }
+ }
+}
+
+static void multipoll_with_epoll_pollset_finish_shutdown(
+ grpc_pollset *pollset) {}
+
+static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
+ epoll_hdr *h = pollset->data.ptr;
+ close(h->epoll_fd);
+ remove_epoll_fd_from_global_list(h->epoll_fd);
+ gpr_free(h);
+}
+
+static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
+ multipoll_with_epoll_pollset_add_fd,
+ multipoll_with_epoll_pollset_maybe_work_and_unlock,
+ multipoll_with_epoll_pollset_finish_shutdown,
+ multipoll_with_epoll_pollset_destroy};
+
+static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset, grpc_fd **fds,
+ size_t nfds) {
+ size_t i;
+ epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr));
+ struct epoll_event ev;
+ int err;
+
+ pollset->vtable = &multipoll_with_epoll_pollset;
+ pollset->data.ptr = h;
+ h->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (h->epoll_fd < 0) {
+ /* TODO(klempner): Fall back to poll here, especially on ENOSYS */
+ gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
+ abort();
+ }
+ add_epoll_fd_to_global_list(h->epoll_fd);
+
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = NULL;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
+ GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
+ if (err < 0) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
+ GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
+ strerror(errno));
+ }
+
+ for (i = 0; i < nfds; i++) {
+ multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
+ }
+}
+
+#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
+
+static void remove_fd_from_all_epoll_sets(int fd) {}
+
+#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
+
+/*******************************************************************************
+ * pollset_set_posix.c
+ */
+
+static grpc_pollset_set *pollset_set_create(void) {
+ grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
+ memset(pollset_set, 0, sizeof(*pollset_set));
+ gpr_mu_init(&pollset_set->mu);
+ return pollset_set;
+}
+
+static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
+ size_t i;
+ gpr_mu_destroy(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ }
+ gpr_free(pollset_set->pollsets);
+ gpr_free(pollset_set->pollset_sets);
+ gpr_free(pollset_set->fds);
+ gpr_free(pollset_set);
+}
+
+static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i, j;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
+ pollset_set->pollset_capacity =
+ GPR_MAX(8, 2 * pollset_set->pollset_capacity);
+ pollset_set->pollsets =
+ gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
+ sizeof(*pollset_set->pollsets));
+ }
+ pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
+ for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
+ if (fd_is_orphaned(pollset_set->fds[i])) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ } else {
+ pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
+ pollset_set->fds[j++] = pollset_set->fds[i];
+ }
+ }
+ pollset_set->fd_count = j;
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ if (pollset_set->pollsets[i] == pollset) {
+ pollset_set->pollset_count--;
+ GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
+ pollset_set->pollsets[pollset_set->pollset_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i, j;
+ gpr_mu_lock(&bag->mu);
+ if (bag->pollset_set_count == bag->pollset_set_capacity) {
+ bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
+ bag->pollset_sets =
+ gpr_realloc(bag->pollset_sets,
+ bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ }
+ bag->pollset_sets[bag->pollset_set_count++] = item;
+ for (i = 0, j = 0; i < bag->fd_count; i++) {
+ if (fd_is_orphaned(bag->fds[i])) {
+ GRPC_FD_UNREF(bag->fds[i], "pollset_set");
+ } else {
+ pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
+ bag->fds[j++] = bag->fds[i];
+ }
+ }
+ bag->fd_count = j;
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i;
+ gpr_mu_lock(&bag->mu);
+ for (i = 0; i < bag->pollset_set_count; i++) {
+ if (bag->pollset_sets[i] == item) {
+ bag->pollset_set_count--;
+ GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
+ bag->pollset_sets[bag->pollset_set_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->fd_count == pollset_set->fd_capacity) {
+ pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
+ pollset_set->fds = gpr_realloc(
+ pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
+ }
+ GRPC_FD_REF(fd, "pollset_set");
+ pollset_set->fds[pollset_set->fd_count++] = fd;
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ if (pollset_set->fds[i] == fd) {
+ pollset_set->fd_count--;
+ GPR_SWAP(grpc_fd *, pollset_set->fds[i],
+ pollset_set->fds[pollset_set->fd_count]);
+ GRPC_FD_UNREF(fd, "pollset_set");
+ break;
+ }
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+/*******************************************************************************
+ * event engine binding
+ */
+
+static void shutdown_engine(void) {
+ fd_global_shutdown();
+ pollset_global_shutdown();
+}
+
+static const grpc_event_engine_vtable vtable = {
+ .pollset_size = sizeof(grpc_pollset),
+
+ .fd_create = fd_create,
+ .fd_wrapped_fd = fd_wrapped_fd,
+ .fd_orphan = fd_orphan,
+ .fd_shutdown = fd_shutdown,
+ .fd_notify_on_read = fd_notify_on_read,
+ .fd_notify_on_write = fd_notify_on_write,
+ .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+
+ .pollset_init = pollset_init,
+ .pollset_shutdown = pollset_shutdown,
+ .pollset_reset = pollset_reset,
+ .pollset_destroy = pollset_destroy,
+ .pollset_work = pollset_work,
+ .pollset_kick = pollset_kick,
+ .pollset_add_fd = pollset_add_fd,
+
+ .pollset_set_create = pollset_set_create,
+ .pollset_set_destroy = pollset_set_destroy,
+ .pollset_set_add_pollset = pollset_set_add_pollset,
+ .pollset_set_del_pollset = pollset_set_del_pollset,
+ .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
+ .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
+ .pollset_set_add_fd = pollset_set_add_fd,
+ .pollset_set_del_fd = pollset_set_del_fd,
+
+ .kick_poller = kick_poller,
+
+ .shutdown_engine = shutdown_engine,
+};
+
+const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) {
+#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
+ platform_become_multipoller = epoll_become_multipoller;
+#else
+ platform_become_multipoller = poll_become_multipoller;
+#endif
+ fd_global_init();
+ pollset_global_init();
+ return &vtable;
+}
+
+#endif
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.h b/src/core/lib/iomgr/ev_poll_and_epoll_posix.h
new file mode 100644
index 0000000000..06d6dbf29d
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H
+
+#include "src/core/lib/iomgr/ev_posix.h"
+
+const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void);
+
+#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index e2a21230b9..0167999dad 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -59,8 +59,6 @@
* FD declarations
*/
-grpc_wakeup_fd grpc_global_wakeup_fd;
-
typedef struct grpc_fd_watcher {
struct grpc_fd_watcher *next;
struct grpc_fd_watcher *prev;
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 95520b01d3..6477b05dcd 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -44,6 +44,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
#include "src/core/lib/iomgr/ev_poll_posix.h"
#include "src/core/lib/support/env.h"
@@ -61,7 +62,7 @@ typedef struct {
} event_engine_factory;
static const event_engine_factory g_factories[] = {
- {"poll", grpc_init_poll_posix},
+ {"poll", grpc_init_poll_posix}, {"legacy", grpc_init_poll_and_epoll_posix},
};
static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index d92addbf54..dcdddc769e 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -235,8 +235,16 @@ namespace Grpc.Core.Tests
await barrier.Task; // make sure the handler has started.
cts.Cancel();
- var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
- Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ try
+ {
+ // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+ await call.ResponseAsync;
+ Assert.Fail();
+ }
+ catch (RpcException ex)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ }
}
[Test]
@@ -265,9 +273,15 @@ namespace Grpc.Core.Tests
await handlerStartedBarrier.Task;
cts.Cancel();
- var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
- Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
-
+ try
+ {
+ await call.ResponseAsync;
+ Assert.Fail();
+ }
+ catch (RpcException ex)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ }
Assert.AreEqual("SUCCESS", await successTcs.Task);
}
diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
index cec8c7ce6b..6a156293ad 100644
--- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
@@ -105,7 +105,15 @@ namespace Grpc.Core.Tests
var parentCall = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
await readyToCancelTcs.Task;
cts.Cancel();
- Assert.ThrowsAsync(typeof(RpcException), async () => await parentCall);
+ try
+ {
+ // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+ await parentCall;
+ Assert.Fail();
+ }
+ catch (RpcException)
+ {
+ }
Assert.AreEqual("CHILD_CALL_CANCELLED", await successTcs.Task);
}
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index ab12c120cb..6fe382751a 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -32,7 +32,7 @@
#endregion
using System;
-using System.Threading;
+using System.Linq;
using Grpc.Core;
using NUnit.Framework;
@@ -44,7 +44,11 @@ namespace Grpc.Core.Tests
public void InitializeAndShutdownGrpcEnvironment()
{
var env = GrpcEnvironment.AddRef();
- Assert.IsNotNull(env.CompletionQueue);
+ Assert.IsTrue(env.CompletionQueues.Count > 0);
+ for (int i = 0; i < env.CompletionQueues.Count; i++)
+ {
+ Assert.IsNotNull(env.CompletionQueues.ElementAt(i));
+ }
GrpcEnvironment.Release();
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 0e204761f6..c35aaf680f 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -53,8 +53,6 @@ namespace Grpc.Core.Internal.Tests
[SetUp]
public void Init()
{
- var environment = GrpcEnvironment.AddRef();
-
// Create a fake server just so we have an instance to refer to.
// The server won't actually be used at all.
server = new Server()
@@ -66,7 +64,6 @@ namespace Grpc.Core.Internal.Tests
fakeCall = new FakeNativeCall();
asyncCallServer = new AsyncCallServer<string, string>(
Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
- environment,
server);
asyncCallServer.InitializeForTesting(fakeCall);
}
@@ -75,7 +72,6 @@ namespace Grpc.Core.Internal.Tests
public void Cleanup()
{
server.ShutdownAsync().Wait();
- GrpcEnvironment.Release();
}
[Test]
@@ -136,7 +132,6 @@ namespace Grpc.Core.Internal.Tests
public void WriteAfterCancelNotificationFails()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
- var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
@@ -181,6 +176,21 @@ namespace Grpc.Core.Internal.Tests
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
+ [Test]
+ public void WriteAfterWriteStatusThrowsInvalidOperationException()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+ asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1"));
+
+ fakeCall.SendStatusFromServerHandler(true);
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
{
Assert.IsTrue(fakeCall.IsDisposed);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 777a1c8c50..81897f8c77 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
-using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -82,7 +81,7 @@ namespace Grpc.Core.Internal.Tests
Assert.ThrowsAsync(typeof(InvalidOperationException),
async () => await asyncCall.ReadMessageAsync());
Assert.Throws(typeof(InvalidOperationException),
- () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+ () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
}
[Test]
@@ -103,7 +102,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -148,7 +147,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -193,7 +192,7 @@ namespace Grpc.Core.Internal.Tests
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
@@ -211,7 +210,9 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
- var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+
+ var writeTask = requestStream.WriteAsync("request1");
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
@@ -223,11 +224,13 @@ namespace Grpc.Core.Internal.Tests
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
- var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+
+ var writeTask = requestStream.WriteAsync("request1");
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
}
@@ -267,7 +270,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
+ public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -275,11 +278,12 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
- Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+ var writeTask = requestStream.WriteAsync("request1");
+ Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Cancelled),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
@@ -290,7 +294,7 @@ namespace Grpc.Core.Internal.Tests
{
asyncCall.StartServerStreamingCall("request1");
Assert.Throws(typeof(InvalidOperationException),
- () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+ () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
}
[Test]
@@ -390,12 +394,13 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
- var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
+ var writeTask = requestStream.WriteAsync("request1");
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
- public void DuplexStreaming_CompleteAfterReceivingStatusFails()
+ public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -411,7 +416,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
+ public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -419,7 +424,9 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
- Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+
+ var writeTask = requestStream.WriteAsync("request1");
+ Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
diff --git a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
index 0663e77d1e..d770f82390 100644
--- a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
+++ b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
@@ -134,7 +134,15 @@ namespace Grpc.Core.Tests
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
- Assert.ThrowsAsync<IOException>(async () => await requestStream.MoveNext());
+ try
+ {
+ // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+ await requestStream.MoveNext();
+ Assert.Fail();
+ }
+ catch (IOException)
+ {
+ }
return "RESPONSE";
});
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 93a6e6a3d9..886adfec33 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -31,7 +31,6 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -56,6 +55,7 @@ namespace Grpc.Core
readonly string target;
readonly GrpcEnvironment environment;
+ readonly CompletionQueueSafeHandle completionQueue;
readonly ChannelSafeHandle handle;
readonly Dictionary<string, ChannelOption> options;
@@ -75,6 +75,7 @@ namespace Grpc.Core
EnsureUserAgentChannelOption(this.options);
this.environment = GrpcEnvironment.AddRef();
+ this.completionQueue = this.environment.PickCompletionQueue();
using (var nativeCredentials = credentials.ToNativeCredentials())
using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
{
@@ -135,7 +136,7 @@ namespace Grpc.Core
tcs.SetCanceled();
}
});
- handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler);
+ handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler);
return tcs.Task;
}
@@ -231,6 +232,14 @@ namespace Grpc.Core
}
}
+ internal CompletionQueueSafeHandle CompletionQueue
+ {
+ get
+ {
+ return this.completionQueue;
+ }
+ }
+
internal void AddCallReference(object call)
{
activeCallCounter.Increment();
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 4bf30e83c1..a8b7b5f00d 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -86,7 +86,6 @@
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="ChannelCredentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
- <Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
<Compile Include="Internal\AsyncCall.cs" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index bee0ef1d62..18af1099f1 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -32,8 +32,9 @@
#endregion
using System;
+using System.Collections.Generic;
+using System.Linq;
using System.Runtime.InteropServices;
-using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@@ -51,12 +52,13 @@ namespace Grpc.Core
static GrpcEnvironment instance;
static int refCount;
static int? customThreadPoolSize;
+ static int? customCompletionQueueCount;
static ILogger logger = new ConsoleLogger();
readonly GrpcThreadPool threadPool;
- readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
+ readonly AtomicCounter cqPickerCounter = new AtomicCounter();
bool isClosed;
/// <summary>
@@ -141,36 +143,51 @@ namespace Grpc.Core
}
/// <summary>
+ /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
+ /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+ /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
+ /// Most users should rely on the default value provided by gRPC library.
+ /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+ /// </summary>
+ public static void SetCompletionQueueCount(int completionQueueCount)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+ GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
+ customCompletionQueueCount = completionQueueCount;
+ }
+ }
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
- completionRegistry = new CompletionRegistry(this);
- threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
+ threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
threadPool.Start();
}
/// <summary>
- /// Gets the completion registry used by this gRPC environment.
+ /// Gets the completion queues used by this gRPC environment.
/// </summary>
- internal CompletionRegistry CompletionRegistry
+ internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
- return this.completionRegistry;
+ return this.threadPool.CompletionQueues;
}
}
/// <summary>
- /// Gets the completion queue used by this gRPC environment.
+ /// Picks a completion queue in a round-robin fashion.
+ /// Shouldn't be invoked on a per-call basis (used at per-channel basis).
/// </summary>
- internal CompletionQueueSafeHandle CompletionQueue
+ internal CompletionQueueSafeHandle PickCompletionQueue()
{
- get
- {
- return this.threadPool.CompletionQueue;
- }
+ var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
+ return this.threadPool.CompletionQueues.ElementAt(cqIndex);
}
/// <summary>
@@ -230,5 +247,15 @@ namespace Grpc.Core
// more work, but seems to work reasonably well for a start.
return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
}
+
+ private int GetCompletionQueueCountOrDefault()
+ {
+ if (customCompletionQueueCount.HasValue)
+ {
+ return customCompletionQueueCount.Value;
+ }
+ // by default, create a completion queue for each thread
+ return GetThreadPoolSizeOrDefault();
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 55351869b5..895be690a5 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -32,12 +32,7 @@
#endregion
using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
using System.Threading.Tasks;
-using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
@@ -57,9 +52,11 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
+ // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+ // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
// Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
@@ -67,7 +64,7 @@ namespace Grpc.Core.Internal
ClientSideStatus? finishedStatus;
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
- : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
@@ -144,7 +141,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -171,7 +168,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
readingDone = true;
@@ -195,7 +192,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
@@ -220,7 +217,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
@@ -232,11 +229,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming request. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -250,29 +246,32 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendCloseFromClientAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: true);
+ GrpcPreconditions.CheckState(started);
- if (!disposed && !finished)
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
{
- call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
+ return earlyResult;
}
- else
+
+ if (disposed || finished)
{
// In case the call has already been finished by the serverside,
- // the halfclose has already been done implicitly, so we only
- // emit the notification for the completion delegate.
- Task.Run(() => HandleSendCloseFromClientFinished(true));
+ // the halfclose has already been done implicitly, so just return
+ // completed task here.
+ halfcloseRequested = true;
+ return Task.FromResult<object>(null);
}
+ call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
halfcloseRequested = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -342,6 +341,45 @@ namespace Grpc.Core.Internal
get { return true; }
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
+
+ if (finishedStatus.HasValue)
+ {
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ // Writing after the call has finished is not a programming error because server can close
+ // the call anytime, so don't throw directly, but let the write task finish with an error.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetException(new RpcException(finishedStatus.Value.Status));
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
+ private Task CheckSendPreconditionsClientSide()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
+
+ if (cancelRequested)
+ {
+ // Return a cancelled task.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetCanceled();
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
private void Initialize(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
@@ -368,7 +406,7 @@ namespace Grpc.Core.Internal
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
- var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+ var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;
@@ -400,6 +438,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
+ // TODO(jtattermusch): handle success==false
responseHeadersTcs.SetResult(responseHeaders);
}
@@ -443,19 +482,6 @@ namespace Grpc.Core.Internal
}
}
- protected override void CheckSendingAllowed(bool allowFinished)
- {
- base.CheckSendingAllowed(true);
-
- // throwing RpcException if we already received status on client
- // side makes the most sense.
- // Note that this throws even for StatusCode.OK.
- if (!allowFinished && finishedStatus.HasValue)
- {
- throw new RpcException(finishedStatus.Value.Status);
- }
- }
-
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 4de23706b2..cb8366c216 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -58,7 +58,6 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
- protected readonly GrpcEnvironment environment;
protected readonly object myLock = new object();
protected INativeCall call;
@@ -67,8 +66,8 @@ namespace Grpc.Core.Internal
protected bool started;
protected bool cancelRequested;
- protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
+ protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
@@ -78,11 +77,10 @@ namespace Grpc.Core.Internal
protected bool initialMetadataSent;
protected long streamingWritesCounter; // Number of streaming send operations started so far.
- public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
+ public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = GrpcPreconditions.CheckNotNull(serializer);
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
- this.environment = GrpcPreconditions.CheckNotNull(environment);
}
/// <summary>
@@ -128,28 +126,31 @@ namespace Grpc.Core.Internal
/// <summary>
/// Initiates sending a message. Only one send operation can be active at a time.
- /// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
{
byte[] payload = UnsafeSerialize(msg);
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: false);
+ GrpcPreconditions.CheckState(started);
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
- sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
streamingWritesCounter++;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
/// <summary>
/// Initiates reading a message. Only one read operation can be active at a time.
- /// completionDelegate is invoked upon completion.
/// </summary>
protected Task<TRead> ReadMessageInternalAsync()
{
@@ -159,7 +160,7 @@ namespace Grpc.Core.Internal
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
- // and maintain its state.
+ // and maintains its state.
GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
return streamingReadTcs.Task;
}
@@ -183,7 +184,7 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
- bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
+ bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@@ -213,24 +214,11 @@ namespace Grpc.Core.Internal
{
}
- protected virtual void CheckSendingAllowed(bool allowFinished)
- {
- GrpcPreconditions.CheckState(started);
- CheckNotCancelled();
- GrpcPreconditions.CheckState(!disposed || allowFinished);
-
- GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
- GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
- GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
- }
-
- protected void CheckNotCancelled()
- {
- if (cancelRequested)
- {
- throw new OperationCanceledException("Remote call has been cancelled.");
- }
- }
+ /// <summary>
+ /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
+ /// logic by directly returning the write operation result task. Normally, null is returned.
+ /// </summary>
+ protected abstract Task CheckSendAllowedOrEarlyResult();
protected byte[] UnsafeSerialize(TWrite msg)
{
@@ -259,39 +247,27 @@ namespace Grpc.Core.Internal
}
}
- protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
- {
- try
- {
- completionDelegate(value, error);
- }
- catch (Exception e)
- {
- Logger.Error(e, "Exception occured while invoking completion delegate.");
- }
- }
-
/// <summary>
/// Handles send completion.
/// </summary>
protected void HandleSendFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
+ TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
+ origTcs.SetException(new InvalidOperationException("Send failed"));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ origTcs.SetResult(null);
}
}
@@ -300,22 +276,23 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendCloseFromClientFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
+ TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed."));
+ // TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs).
+ origTcs.SetException(new InvalidOperationException("Sending close from client has failed."));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ origTcs.SetResult(null);
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index b1566b44a7..56c23ba3ef 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -51,14 +51,14 @@ namespace Grpc.Core.Internal
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly Server server;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer)
{
this.server = GrpcPreconditions.CheckNotNull(server);
}
- public void Initialize(CallSafeHandle call)
+ public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
{
- call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
+ call.Initialize(completionQueue);
server.AddCallReference(this);
InitializeInternal(call);
@@ -91,11 +91,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming response. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -110,20 +109,22 @@ namespace Grpc.Core.Internal
/// Initiates sending a initial metadata.
/// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
/// to make things simpler.
- /// completionDelegate is invoked upon completion.
/// </summary>
- public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendInitialMetadataAsync(Metadata headers)
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(headers, "metadata");
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
- CheckSendingAllowed(allowFinished: false);
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
@@ -131,7 +132,8 @@ namespace Grpc.Core.Internal
}
this.initialMetadataSent = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -196,6 +198,16 @@ namespace Grpc.Core.Internal
server.RemoveCallReference(this);
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
+ GrpcPreconditions.CheckState(!finished, "Already finished.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
+ GrpcPreconditions.CheckState(!disposed);
+
+ return null;
+ }
+
/// <summary>
/// Handles the server side close completion.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
deleted file mode 100644
index 7e86fddb4d..0000000000
--- a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
+++ /dev/null
@@ -1,94 +0,0 @@
-#region Copyright notice and license
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-#endregion
-
-using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
-using System.Threading.Tasks;
-using Grpc.Core.Internal;
-using Grpc.Core.Utils;
-
-namespace Grpc.Core.Internal
-{
- /// <summary>
- /// If error != null, there's been an error or operation has been cancelled.
- /// </summary>
- internal delegate void AsyncCompletionDelegate<T>(T result, Exception error);
-
- /// <summary>
- /// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
- /// </summary>
- internal class AsyncCompletionTaskSource<T>
- {
- readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
- readonly AsyncCompletionDelegate<T> completionDelegate;
-
- public AsyncCompletionTaskSource()
- {
- completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion);
- }
-
- public Task<T> Task
- {
- get
- {
- return tcs.Task;
- }
- }
-
- public AsyncCompletionDelegate<T> CompletionDelegate
- {
- get
- {
- return completionDelegate;
- }
- }
-
- private void HandleCompletion(T value, Exception error)
- {
- if (error == null)
- {
- tcs.SetResult(value);
- return;
- }
- if (error is OperationCanceledException)
- {
- tcs.SetCanceled();
- return;
- }
- tcs.SetException(error);
- }
- }
-}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 244b97d4a4..82361f5797 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -47,16 +47,14 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
const uint GRPC_WRITE_BUFFER_HINT = 1;
- CompletionRegistry completionRegistry;
CompletionQueueSafeHandle completionQueue;
private CallSafeHandle()
{
}
- public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
+ public void Initialize(CompletionQueueSafeHandle completionQueue)
{
- this.completionRegistry = completionRegistry;
this.completionQueue = completionQueue;
}
@@ -70,7 +68,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@@ -90,7 +88,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
}
@@ -100,7 +98,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
}
@@ -110,7 +108,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
}
@@ -120,7 +118,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
}
@@ -130,7 +128,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
@@ -142,7 +140,7 @@ namespace Grpc.Core.Internal
{
var ctx = BatchContextSafeHandle.Create();
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
@@ -153,7 +151,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
@@ -163,7 +161,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
@@ -173,7 +171,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
@@ -183,7 +181,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 1dbd1f4e34..62864dff0c 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
- public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
+ public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
{
using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
{
@@ -72,7 +72,7 @@ namespace Grpc.Core.Internal
{
result.SetCredentials(credentials);
}
- result.Initialize(registry, cq);
+ result.Initialize(cq);
return result;
}
}
@@ -82,11 +82,10 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
}
- public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
- CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
+ public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 013f00ff6f..924de028f5 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -50,16 +50,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TRequest message)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendMessageAsync(message, GetWriteFlags());
}
public Task CompleteAsync()
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendCloseFromClient(taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendCloseFromClientAsync();
}
public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 91364cdc70..46f5624223 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -45,6 +45,7 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
AtomicCounter shutdownRefcount = new AtomicCounter(1);
+ CompletionRegistry completionRegistry;
private CompletionQueueSafeHandle()
{
@@ -53,7 +54,13 @@ namespace Grpc.Core.Internal
public static CompletionQueueSafeHandle Create()
{
return Native.grpcsharp_completion_queue_create();
+ }
+ public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry)
+ {
+ var cq = Native.grpcsharp_completion_queue_create();
+ cq.completionRegistry = completionRegistry;
+ return cq;
}
public CompletionQueueEvent Next()
@@ -83,6 +90,15 @@ namespace Grpc.Core.Internal
DecrementShutdownRefcount();
}
+ /// <summary>
+ /// Completion registry associated with this completion queue.
+ /// Doesn't need to be set if only using Pluck() operations.
+ /// </summary>
+ public CompletionRegistry CompletionRegistry
+ {
+ get { return completionRegistry; }
+ }
+
protected override bool ReleaseHandle()
{
Native.grpcsharp_completion_queue_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index b538726fa1..4de543bef7 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,15 +33,15 @@
using System;
using System.Collections.Generic;
-using System.Runtime.InteropServices;
+using System.Linq;
using System.Threading;
-using System.Threading.Tasks;
using Grpc.Core.Logging;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Pool of threads polling on the same completion queue.
+ /// Pool of threads polling on a set of completions queues.
/// </summary>
internal class GrpcThreadPool
{
@@ -51,25 +51,31 @@ namespace Grpc.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
+ readonly int completionQueueCount;
- CompletionQueueSafeHandle cq;
+ IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
- public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
+ /// <summary>
+ /// Creates a thread pool threads polling on a set of completions queues.
+ /// </summary>
+ /// <param name="environment">Environment.</param>
+ /// <param name="poolSize">Pool size.</param>
+ /// <param name="completionQueueCount">Completion queue count.</param>
+ public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
{
this.environment = environment;
this.poolSize = poolSize;
+ this.completionQueueCount = completionQueueCount;
+ GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
+ "Thread pool size cannot be smaller than the number of completion queues used.");
}
public void Start()
{
lock (myLock)
{
- if (cq != null)
- {
- throw new InvalidOperationException("Already started.");
- }
-
- cq = CompletionQueueSafeHandle.Create();
+ GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
+ completionQueues = CreateCompletionQueueList(environment, completionQueueCount);
for (int i = 0; i < poolSize; i++)
{
@@ -82,37 +88,48 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- cq.Shutdown();
+ foreach (var cq in completionQueues)
+ {
+ cq.Shutdown();
+ }
+
foreach (var thread in threads)
{
thread.Join();
}
- cq.Dispose();
+ foreach (var cq in completionQueues)
+ {
+ cq.Dispose();
+ }
}
}
- internal CompletionQueueSafeHandle CompletionQueue
+ internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
- return cq;
+ return completionQueues;
}
}
- private Thread CreateAndStartThread(int i)
+ private Thread CreateAndStartThread(int threadIndex)
{
- var thread = new Thread(new ThreadStart(RunHandlerLoop));
+ var cqIndex = threadIndex % completionQueues.Count;
+ var cq = completionQueues.ElementAt(cqIndex);
+
+ var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
thread.IsBackground = false;
+ thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
- thread.Name = "grpc " + i;
+
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
- private void RunHandlerLoop()
+ private void RunHandlerLoop(CompletionQueueSafeHandle cq)
{
CompletionQueueEvent ev;
do
@@ -124,7 +141,7 @@ namespace Grpc.Core.Internal
IntPtr tag = ev.tag;
try
{
- var callback = environment.CompletionRegistry.Extract(tag);
+ var callback = cq.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
@@ -135,5 +152,16 @@ namespace Grpc.Core.Internal
}
while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
}
+
+ private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
+ {
+ var list = new List<CompletionQueueSafeHandle>();
+ for (int i = 0; i < completionQueueCount; i++)
+ {
+ var completionRegistry = new CompletionRegistry(environment);
+ list.Add(CompletionQueueSafeHandle.Create(completionRegistry));
+ }
+ return list.AsReadOnly();
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 42fd4d4dc6..65607ed120 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -137,6 +137,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release;
public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create;
+ public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue;
public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port;
public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port;
public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start;
@@ -244,6 +245,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library);
this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library);
+ this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library);
this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library);
this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library);
this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library);
@@ -348,6 +350,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = PInvokeMethods.grpcsharp_server_credentials_release;
this.grpcsharp_server_create = PInvokeMethods.grpcsharp_server_create;
+ this.grpcsharp_server_register_completion_queue = PInvokeMethods.grpcsharp_server_register_completion_queue;
this.grpcsharp_server_add_insecure_http2_port = PInvokeMethods.grpcsharp_server_add_insecure_http2_port;
this.grpcsharp_server_add_secure_http2_port = PInvokeMethods.grpcsharp_server_add_secure_http2_port;
this.grpcsharp_server_start = PInvokeMethods.grpcsharp_server_start;
@@ -493,7 +496,8 @@ namespace Grpc.Core.Internal
public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials);
- public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
+ public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args);
+ public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq);
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
@@ -773,7 +777,10 @@ namespace Grpc.Core.Internal
// ServerSafeHandle
[DllImport("grpc_csharp_ext.dll")]
- public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
+ public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr);
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index febebba209..6a2f520163 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
- Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
+ Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@@ -62,14 +62,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -121,14 +121,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -179,14 +179,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -237,14 +237,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -281,13 +281,13 @@ namespace Grpc.Core.Internal
{
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload, environment, newRpc.Server);
+ (payload) => payload, (payload) => payload, newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index ecfee0bfdd..25b79b4398 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -52,16 +52,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendMessageAsync(message, GetWriteFlags());
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendInitialMetadataAsync(responseHeaders);
}
public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 6b5f70e220..8581302706 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -31,12 +31,6 @@
#endregion
-using System;
-using System.Collections.Concurrent;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
-using Grpc.Core.Utils;
-
namespace Grpc.Core.Internal
{
/// <summary>
@@ -50,12 +44,17 @@ namespace Grpc.Core.Internal
{
}
- public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
+ public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
- return Native.grpcsharp_server_create(cq, args);
+ return Native.grpcsharp_server_create(args);
+ }
+
+ public void RegisterCompletionQueue(CompletionQueueSafeHandle cq)
+ {
+ Native.grpcsharp_server_register_completion_queue(this, cq);
}
public int AddInsecurePort(string addr)
@@ -73,18 +72,18 @@ namespace Grpc.Core.Internal
Native.grpcsharp_server_start(this);
}
- public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
+ public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
- environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
- public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
+ public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
- environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index d538a4671f..069185e13a 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -34,8 +34,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
+using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
@@ -48,7 +47,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
- const int InitialAllowRpcTokenCount = 10;
+ const int InitialAllowRpcTokenCountPerCq = 10;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@@ -80,7 +79,12 @@ namespace Grpc.Core
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
- this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
+ this.handle = ServerSafeHandle.NewServer(channelArgs);
+ }
+
+ foreach (var cq in environment.CompletionQueues)
+ {
+ this.handle.RegisterCompletionQueue(cq);
}
}
@@ -133,9 +137,12 @@ namespace Grpc.Core
// Starting with more than one AllowOneRpc tokens can significantly increase
// unary RPC throughput.
- for (int i = 0; i < InitialAllowRpcTokenCount; i++)
+ for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
{
- AllowOneRpc();
+ foreach (var cq in environment.CompletionQueues)
+ {
+ AllowOneRpc(cq);
+ }
}
}
}
@@ -154,7 +161,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(HandleServerShutdown, environment);
+ var cq = environment.CompletionQueues.First(); // any cq will do
+ handle.ShutdownAndNotify(HandleServerShutdown, cq);
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
@@ -174,7 +182,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(HandleServerShutdown, environment);
+ var cq = environment.CompletionQueues.First(); // any cq will do
+ handle.ShutdownAndNotify(HandleServerShutdown, cq);
handle.CancelAllCalls();
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
@@ -244,11 +253,11 @@ namespace Grpc.Core
/// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
- private void AllowOneRpc()
+ private void AllowOneRpc(CompletionQueueSafeHandle cq)
{
if (!shutdownRequested)
{
- handle.RequestCall(HandleNewServerRpc, environment);
+ handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
}
}
@@ -265,7 +274,7 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
- private async Task HandleCallAsync(ServerRpcNew newRpc)
+ private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
try
{
@@ -274,7 +283,7 @@ namespace Grpc.Core
{
callHandler = NoSuchMethodCallHandler.Instance;
}
- await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
+ await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
}
catch (Exception e)
{
@@ -285,9 +294,9 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
- private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
+ private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
- Task.Run(() => AllowOneRpc());
+ Task.Run(() => AllowOneRpc(cq));
if (success)
{
@@ -296,7 +305,7 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
- HandleCallAsync(newRpc); // we don't need to await.
+ HandleCallAsync(newRpc, cq); // we don't need to await.
}
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 1541cfd7bb..aea40afee2 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -471,8 +471,16 @@ namespace Grpc.IntegrationTesting
cts.Cancel();
- var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
- Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ try
+ {
+ // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+ await call.ResponseStream.MoveNext();
+ Assert.Fail();
+ }
+ catch (RpcException ex)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ }
}
Console.WriteLine("Passed!");
}
@@ -497,9 +505,16 @@ namespace Grpc.IntegrationTesting
// Deadline was reached before write has started. Eat the exception and continue.
}
- var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
- // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
- Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
+ try
+ {
+ await call.ResponseStream.MoveNext();
+ Assert.Fail();
+ }
+ catch (RpcException ex)
+ {
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
+ }
}
Console.WriteLine("Passed!");
}
@@ -577,9 +592,17 @@ namespace Grpc.IntegrationTesting
await call.RequestStream.WriteAsync(request);
await call.RequestStream.CompleteAsync();
- var e = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.ToListAsync());
- Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
- Assert.AreEqual(echoStatus.Message, e.Status.Detail);
+ try
+ {
+ // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+ await call.ResponseStream.ToListAsync();
+ Assert.Fail();
+ }
+ catch (RpcException e)
+ {
+ Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
+ Assert.AreEqual(echoStatus.Message, e.Status.Detail);
+ }
}
Console.WriteLine("Passed!");
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 5b8ff9b819..4beef9ded8 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -806,11 +806,14 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_set_credentials(
/* Server */
GPR_EXPORT grpc_server *GPR_CALLTYPE
-grpcsharp_server_create(grpc_completion_queue *cq,
- const grpc_channel_args *args) {
- grpc_server *server = grpc_server_create(args, NULL);
+grpcsharp_server_create(const grpc_channel_args *args) {
+ return grpc_server_create(args, NULL);
+}
+
+GPR_EXPORT void GPR_CALLTYPE
+grpcsharp_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq) {
grpc_server_register_completion_queue(server, cq, NULL);
- return server;
}
GPR_EXPORT int32_t GPR_CALLTYPE
diff --git a/src/objective-c/CronetFramework.podspec b/src/objective-c/CronetFramework.podspec
new file mode 100644
index 0000000000..20af7647f7
--- /dev/null
+++ b/src/objective-c/CronetFramework.podspec
@@ -0,0 +1,43 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+Pod::Spec.new do |s|
+ s.name = "CronetFramework"
+ s.version = "0.0.2"
+ s.summary = "Cronet, precompiled and used as a framework."
+ s.homepage = "http://chromium.org"
+ s.license = { :type => 'BSD' }
+ s.vendored_framework = "Cronet.framework"
+ s.author = "The Chromium Authors"
+ s.ios.deployment_target = "8.0"
+ s.source = { :http => 'https://storage.googleapis.com/grpc-precompiled-binaries/cronet/Cronet.framework.zip' }
+ s.preserve_paths = "Cronet.framework"
+ s.public_header_files = "Cronet.framework/Headers/**/*{.h}"
+end
diff --git a/src/objective-c/GRPCClient/GRPCCall+Cronet.h b/src/objective-c/GRPCClient/GRPCCall+Cronet.h
new file mode 100644
index 0000000000..7f7fc6c2a9
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCCall+Cronet.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+#import <Cronet/Cronet.h>
+
+#import "GRPCCall.h"
+
+/**
+ * Methods for using cronet transport.
+ */
+@interface GRPCCall (Cronet)
+
+/**
+ * This method should be called before issuing the first RPC. It should be
+ * called only once. Create an instance of Cronet engine in your app elsewhere
+ * and pass the instance pointer in the cronet_engine parameter. Once set,
+ * all subsequent RPCs will use Cronet transport. The method is not thread
+ * safe.
+ */
++(void)useCronetWithEngine:(cronet_engine *)engine;
+
++(cronet_engine *)cronetEngine;
+
++(BOOL)isUsingCronet;
+
+@end
diff --git a/src/objective-c/GRPCClient/GRPCCall+Cronet.m b/src/objective-c/GRPCClient/GRPCCall+Cronet.m
new file mode 100644
index 0000000000..69a410e95a
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCCall+Cronet.m
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import "GRPCCall+Cronet.h"
+
+static BOOL useCronet = NO;
+static cronet_engine *globalCronetEngine;
+
+@implementation GRPCCall (Cronet)
+
++ (void)useCronetWithEngine:(cronet_engine *)engine {
+ useCronet = YES;
+ globalCronetEngine = engine;
+}
+
++ (cronet_engine *)cronetEngine {
+ return globalCronetEngine;
+}
+
++ (BOOL)isUsingCronet {
+ return useCronet;
+}
+
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h
index 70d1a9bd2f..3219835d02 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.h
@@ -56,6 +56,12 @@ struct grpc_channel_credentials;
+ (nullable GRPCChannel *)secureChannelWithHost:(nonnull NSString *)host;
/**
+ * Creates a secure channel to the specified @c host using Cronet as a transport mechanism.
+ */
++ (nullable GRPCChannel *)secureCronetChannelWithHost:(NSString *)host
+ channelArgs:(NSDictionary *)channelArgs;
+
+/**
* Creates a secure channel to the specified @c host using the specified @c credentials and
* @c channelArgs. Only in tests should @c GRPC_SSL_TARGET_NAME_OVERRIDE_ARG channel arg be set.
*/
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m
index 203ef58c0d..e4e0dbe6d2 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.m
@@ -34,10 +34,13 @@
#import "GRPCChannel.h"
#include <grpc/grpc_security.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#import <Cronet/Cronet.h>
+#import <GRPCClient/GRPCCall+Cronet.h>
#import "GRPCCompletionQueue.h"
void freeChannelArgs(grpc_channel_args *channel_args) {
@@ -99,6 +102,22 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
grpc_channel_args *_channelArgs;
}
+- (instancetype)initWithHost:(NSString *)host
+ cronetEngine:(cronet_engine *)cronetEngine
+ channelArgs:(NSDictionary *)channelArgs {
+ if (!host) {
+ [NSException raise:NSInvalidArgumentException format:@"host argument missing"];
+ }
+
+ if (self = [super init]) {
+ _channelArgs = buildChannelArgs(channelArgs);
+ _host = [host copy];
+ _unmanagedChannel = grpc_cronet_secure_channel_create(cronetEngine, _host.UTF8String, _channelArgs,
+ NULL);
+ }
+
+ return self;
+}
- (instancetype)initWithHost:(NSString *)host
secure:(BOOL)secure
@@ -133,6 +152,17 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
freeChannelArgs(_channelArgs);
}
++ (GRPCChannel *)secureCronetChannelWithHost:(NSString *)host
+ channelArgs:(NSDictionary *)channelArgs {
+ cronet_engine *engine = [GRPCCall cronetEngine];
+ if (!engine) {
+ [NSException raise:NSInvalidArgumentException
+ format:@"cronet_engine is NULL. Set it first."];
+ return nil;
+ }
+ return [[GRPCChannel alloc] initWithHost:host cronetEngine:engine channelArgs:channelArgs];
+}
+
+ (GRPCChannel *)secureChannelWithHost:(NSString *)host {
return [[GRPCChannel alloc] initWithHost:host secure:YES credentials:NULL channelArgs:NULL];
}
diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m
index 43166cbb52..7da508810c 100644
--- a/src/objective-c/GRPCClient/private/GRPCHost.m
+++ b/src/objective-c/GRPCClient/private/GRPCHost.m
@@ -37,6 +37,7 @@
#include <grpc/grpc_security.h>
#import <GRPCClient/GRPCCall.h>
#import <GRPCClient/GRPCCall+ChannelArg.h>
+#import <GRPCClient/GRPCCall+Cronet.h>
#import "GRPCChannel.h"
#import "GRPCCompletionQueue.h"
@@ -200,15 +201,21 @@ NS_ASSUME_NONNULL_BEGIN
- (GRPCChannel *)newChannel {
NSDictionary *args = [self channelArgs];
+ BOOL useCronet = [GRPCCall isUsingCronet];
if (_secure) {
GRPCChannel *channel;
@synchronized(self) {
if (_channelCreds == nil) {
[self setTLSPEMRootCerts:nil withPrivateKey:nil withCertChain:nil error:nil];
}
- channel = [GRPCChannel secureChannelWithHost:_address
- credentials:_channelCreds
- channelArgs:args];
+ if (useCronet) {
+ channel = [GRPCChannel secureCronetChannelWithHost:_address
+ channelArgs:args];
+ } else {
+ channel = [GRPCChannel secureChannelWithHost:_address
+ credentials:_channelCreds
+ channelArgs:args];
+ }
}
return channel;
} else {
diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m
index 4f096b9efa..781c500f73 100644
--- a/src/objective-c/tests/InteropTests.m
+++ b/src/objective-c/tests/InteropTests.m
@@ -35,7 +35,9 @@
#include <grpc/status.h>
+#import <Cronet/Cronet.h>
#import <GRPCClient/GRPCCall+Tests.h>
+#import <GRPCClient/GRPCCall+Cronet.h>
#import <ProtoRPC/ProtoRPC.h>
#import <RemoteTest/Empty.pbobjc.h>
#import <RemoteTest/Messages.pbobjc.h>
@@ -78,6 +80,8 @@
#pragma mark Tests
+static cronet_engine *cronetEngine = NULL;
+
@implementation InteropTests {
RMTTestService *_service;
}
@@ -88,6 +92,15 @@
- (void)setUp {
_service = self.class.host ? [RMTTestService serviceWithHost:self.class.host] : nil;
+#ifdef GRPC_COMPILE_WITH_CRONET
+ if (cronetEngine == NULL) {
+ // Cronet setup
+ [Cronet setHttp2Enabled:YES];
+ [Cronet start];
+ cronetEngine = [Cronet getGlobalEngine];
+ [GRPCCall useCronetWithEngine:cronetEngine];
+ }
+#endif
}
- (void)testEmptyUnaryRPC {
@@ -245,6 +258,8 @@
[self waitForExpectationsWithTimeout:4 handler:nil];
}
+#ifndef GRPC_COMPILE_WITH_CRONET
+// TODO(makdharma@): Fix this test
- (void)testEmptyStreamRPC {
XCTAssertNotNil(self.class.host);
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
@@ -258,6 +273,7 @@
}];
[self waitForExpectationsWithTimeout:2 handler:nil];
}
+#endif
- (void)testCancelAfterBeginRPC {
XCTAssertNotNil(self.class.host);
diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile
index 7ec7a25898..508641d681 100644
--- a/src/objective-c/tests/Podfile
+++ b/src/objective-c/tests/Podfile
@@ -3,6 +3,7 @@ platform :ios, '8.0'
pod 'Protobuf', :path => "../../../third_party/protobuf"
pod 'BoringSSL', :podspec => ".."
+pod 'CronetFramework', :podspec => ".."
pod 'gRPC', :path => "../../.."
pod 'RemoteTest', :path => "RemoteTestClient"
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index b844a14c48..86447314b6 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -27,5 +27,763 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""gRPC's Python API."""
+
__import__('pkg_resources').declare_namespace(__name__)
+import abc
+import enum
+
+import six
+
+from grpc._cython import cygrpc as _cygrpc
+
+
+############################## Future Interface ###############################
+
+
+class FutureTimeoutError(Exception):
+ """Indicates that a method call on a Future timed out."""
+
+
+class FutureCancelledError(Exception):
+ """Indicates that the computation underlying a Future was cancelled."""
+
+
+class Future(six.with_metaclass(abc.ABCMeta)):
+ """A representation of a computation in another control flow.
+
+ Computations represented by a Future may be yet to be begun, may be ongoing,
+ or may have already completed.
+ """
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Attempts to cancel the computation.
+
+ This method does not block.
+
+ Returns:
+ True if the computation has not yet begun, will not be allowed to take
+ place, and determination of both was possible without blocking. False
+ under all other circumstances including but not limited to the
+ computation's already having begun, the computation's already having
+ finished, and the computation's having been scheduled for execution on a
+ remote system for which a determination of whether or not it commenced
+ before being cancelled cannot be made without blocking.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Describes whether the computation was cancelled.
+
+ This method does not block.
+
+ Returns:
+ True if the computation was cancelled any time before its result became
+ immediately available. False under all other circumstances including but
+ not limited to this object's cancel method not having been called and
+ the computation's result having become immediately available.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def running(self):
+ """Describes whether the computation is taking place.
+
+ This method does not block.
+
+ Returns:
+ True if the computation is scheduled to take place in the future or is
+ taking place now, or False if the computation took place in the past or
+ was cancelled.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def done(self):
+ """Describes whether the computation has taken place.
+
+ This method does not block.
+
+ Returns:
+ True if the computation is known to have either completed or have been
+ unscheduled or interrupted. False if the computation may possibly be
+ executing or scheduled to execute later.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def result(self, timeout=None):
+ """Accesses the outcome of the computation or raises its exception.
+
+ This method may return immediately or may block.
+
+ Args:
+ timeout: The length of time in seconds to wait for the computation to
+ finish or be cancelled, or None if this method should block until the
+ computation has finished or is cancelled no matter how long that takes.
+
+ Returns:
+ The return value of the computation.
+
+ Raises:
+ FutureTimeoutError: If a timeout value is passed and the computation does
+ not terminate within the allotted time.
+ FutureCancelledError: If the computation was cancelled.
+ Exception: If the computation raised an exception, this call will raise
+ the same exception.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def exception(self, timeout=None):
+ """Return the exception raised by the computation.
+
+ This method may return immediately or may block.
+
+ Args:
+ timeout: The length of time in seconds to wait for the computation to
+ terminate or be cancelled, or None if this method should block until
+ the computation is terminated or is cancelled no matter how long that
+ takes.
+
+ Returns:
+ The exception raised by the computation, or None if the computation did
+ not raise an exception.
+
+ Raises:
+ FutureTimeoutError: If a timeout value is passed and the computation does
+ not terminate within the allotted time.
+ FutureCancelledError: If the computation was cancelled.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def traceback(self, timeout=None):
+ """Access the traceback of the exception raised by the computation.
+
+ This method may return immediately or may block.
+
+ Args:
+ timeout: The length of time in seconds to wait for the computation to
+ terminate or be cancelled, or None if this method should block until
+ the computation is terminated or is cancelled no matter how long that
+ takes.
+
+ Returns:
+ The traceback of the exception raised by the computation, or None if the
+ computation did not raise an exception.
+
+ Raises:
+ FutureTimeoutError: If a timeout value is passed and the computation does
+ not terminate within the allotted time.
+ FutureCancelledError: If the computation was cancelled.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_done_callback(self, fn):
+ """Adds a function to be called at completion of the computation.
+
+ The callback will be passed this Future object describing the outcome of
+ the computation.
+
+ If the computation has already completed, the callback will be called
+ immediately.
+
+ Args:
+ fn: A callable taking this Future object as its single parameter.
+ """
+ raise NotImplementedError()
+
+
+################################ gRPC Enums ##################################
+
+
+@enum.unique
+class ChannelConnectivity(enum.Enum):
+ """Mirrors grpc_connectivity_state in the gRPC Core.
+
+ Attributes:
+ IDLE: The channel is idle.
+ CONNECTING: The channel is connecting.
+ READY: The channel is ready to conduct RPCs.
+ TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
+ recover.
+ FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
+ """
+ IDLE = (_cygrpc.ConnectivityState.idle, 'idle')
+ CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting')
+ READY = (_cygrpc.ConnectivityState.ready, 'ready')
+ TRANSIENT_FAILURE = (
+ _cygrpc.ConnectivityState.transient_failure, 'transient failure')
+ FATAL_FAILURE = (_cygrpc.ConnectivityState.fatal_failure, 'fatal failure')
+
+
+@enum.unique
+class StatusCode(enum.Enum):
+ """Mirrors grpc_status_code in the gRPC Core."""
+ OK = (_cygrpc.StatusCode.ok, 'ok')
+ CANCELLED = (_cygrpc.StatusCode.cancelled, 'cancelled')
+ UNKNOWN = (_cygrpc.StatusCode.unknown, 'unknown')
+ INVALID_ARGUMENT = (
+ _cygrpc.StatusCode.invalid_argument, 'invalid argument')
+ DEADLINE_EXCEEDED = (
+ _cygrpc.StatusCode.deadline_exceeded, 'deadline exceeded')
+ NOT_FOUND = (_cygrpc.StatusCode.not_found, 'not found')
+ ALREADY_EXISTS = (_cygrpc.StatusCode.already_exists, 'already exists')
+ PERMISSION_DENIED = (
+ _cygrpc.StatusCode.permission_denied, 'permission denied')
+ RESOURCE_EXHAUSTED = (
+ _cygrpc.StatusCode.resource_exhausted, 'resource exhausted')
+ FAILED_PRECONDITION = (
+ _cygrpc.StatusCode.failed_precondition, 'failed precondition')
+ ABORTED = (_cygrpc.StatusCode.aborted, 'aborted')
+ OUT_OF_RANGE = (_cygrpc.StatusCode.out_of_range, 'out of range')
+ UNIMPLEMENTED = (_cygrpc.StatusCode.unimplemented, 'unimplemented')
+ INTERNAL = (_cygrpc.StatusCode.internal, 'internal')
+ UNAVAILABLE = (_cygrpc.StatusCode.unavailable, 'unavailable')
+ DATA_LOSS = (_cygrpc.StatusCode.data_loss, 'data loss')
+ UNAUTHENTICATED = (_cygrpc.StatusCode.unauthenticated, 'unauthenticated')
+
+
+############################# gRPC Exceptions ################################
+
+
+class RpcError(Exception):
+ """Raised by the gRPC library to indicate non-OK-status RPC termination."""
+
+
+############################## Shared Context ################################
+
+
+class RpcContext(six.with_metaclass(abc.ABCMeta)):
+ """Provides RPC-related information and action."""
+
+ @abc.abstractmethod
+ def is_active(self):
+ """Describes whether the RPC is active or has terminated."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def time_remaining(self):
+ """Describes the length of allowed time remaining for the RPC.
+
+ Returns:
+ A nonnegative float indicating the length of allowed time in seconds
+ remaining for the RPC to complete before it is considered to have timed
+ out, or None if no deadline was specified for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC.
+
+ Idempotent and has no effect if the RPC has already terminated.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_callback(self, callback):
+ """Registers a callback to be called on RPC termination.
+
+ Args:
+ callback: A no-parameter callable to be called on RPC termination.
+
+ Returns:
+ True if the callback was added and will be called later; False if the
+ callback was not added and will not later be called (because the RPC
+ already terminated or some other reason).
+ """
+ raise NotImplementedError()
+
+
+######################### Invocation-Side Context ############################
+
+
+class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
+ """Invocation-side utility object for an RPC."""
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata from the service-side of the RPC.
+
+ This method blocks until the value is available.
+
+ Returns:
+ The initial metadata as a sequence of pairs of bytes.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def trailing_metadata(self):
+ """Accesses the trailing metadata from the service-side of the RPC.
+
+ This method blocks until the value is available.
+
+ Returns:
+ The trailing metadata as a sequence of pairs of bytes.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def code(self):
+ """Accesses the status code emitted by the service-side of the RPC.
+
+ This method blocks until the value is available.
+
+ Returns:
+ The StatusCode value for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def details(self):
+ """Accesses the details value emitted by the service-side of the RPC.
+
+ This method blocks until the value is available.
+
+ Returns:
+ The bytes of the details of the RPC.
+ """
+ raise NotImplementedError()
+
+
+######################## Multi-Callable Interfaces ###########################
+
+
+class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
+ """Affords invoking a unary-unary RPC."""
+
+ @abc.abstractmethod
+ def __call__(self, request, timeout=None, metadata=None, with_call=False):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional duration of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+ with_call: Whether or not to include return a Call for the RPC in addition
+ to the response.
+
+ Returns:
+ The response value for the RPC, and a Call for the RPC if with_call was
+ set to True at invocation.
+
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future(self, request, timeout=None, metadata=None):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional duration of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and a Future. In the event of
+ RPC completion, the return Future's result value will be the response
+ message of the RPC. Should the event terminate with non-OK status, the
+ returned Future's exception value will be an RpcError.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
+ """Affords invoking a unary-stream RPC."""
+
+ @abc.abstractmethod
+ def __call__(self, request, timeout=None, metadata=None):
+ """Invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional duration of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and an iterator of response
+ values. Drawing response values from the returned iterator may raise
+ RpcError indicating termination of the RPC with non-OK status.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
+ """Affords invoking a stream-unary RPC in any call style."""
+
+ @abc.abstractmethod
+ def __call__(
+ self, request_iterator, timeout=None, metadata=None, with_call=False):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: An optional duration of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+ with_call: Whether or not to include return a Call for the RPC in addition
+ to the response.
+
+ Returns:
+ The response value for the RPC, and a Call for the RPC if with_call was
+ set to True at invocation.
+
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future(self, request_iterator, timeout=None, metadata=None):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: An optional duration of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and a Future. In the event of
+ RPC completion, the return Future's result value will be the response
+ message of the RPC. Should the event terminate with non-OK status, the
+ returned Future's exception value will be an RpcError.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
+ """Affords invoking a stream-stream RPC in any call style."""
+
+ @abc.abstractmethod
+ def __call__(self, request_iterator, timeout=None, metadata=None):
+ """Invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: An optional duration of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and an iterator of response
+ values. Drawing response values from the returned iterator may raise
+ RpcError indicating termination of the RPC with non-OK status.
+ """
+ raise NotImplementedError()
+
+
+############################# Channel Interface ##############################
+
+
+class Channel(six.with_metaclass(abc.ABCMeta)):
+ """Affords RPC invocation via generic methods."""
+
+ @abc.abstractmethod
+ def subscribe(self, callback, try_to_connect=False):
+ """Subscribes to this Channel's connectivity.
+
+ Args:
+ callback: A callable to be invoked and passed a ChannelConnectivity value
+ describing this Channel's connectivity. The callable will be invoked
+ immediately upon subscription and again for every change to this
+ Channel's connectivity thereafter until it is unsubscribed or this
+ Channel object goes out of scope.
+ try_to_connect: A boolean indicating whether or not this Channel should
+ attempt to connect if it is not already connected and ready to conduct
+ RPCs.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unsubscribe(self, callback):
+ """Unsubscribes a callback from this Channel's connectivity.
+
+ Args:
+ callback: A callable previously registered with this Channel from having
+ been passed to its "subscribe" method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_unary(
+ self, method, request_serializer=None, response_deserializer=None):
+ """Creates a UnaryUnaryMultiCallable for a unary-unary method.
+
+ Args:
+ method: The name of the RPC method.
+
+ Returns:
+ A UnaryUnaryMultiCallable value for the named unary-unary method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_stream(
+ self, method, request_serializer=None, response_deserializer=None):
+ """Creates a UnaryStreamMultiCallable for a unary-stream method.
+
+ Args:
+ method: The name of the RPC method.
+
+ Returns:
+ A UnaryStreamMultiCallable value for the name unary-stream method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_unary(
+ self, method, request_serializer=None, response_deserializer=None):
+ """Creates a StreamUnaryMultiCallable for a stream-unary method.
+
+ Args:
+ method: The name of the RPC method.
+
+ Returns:
+ A StreamUnaryMultiCallable value for the named stream-unary method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_stream(
+ self, method, request_serializer=None, response_deserializer=None):
+ """Creates a StreamStreamMultiCallable for a stream-stream method.
+
+ Args:
+ method: The name of the RPC method.
+
+ Returns:
+ A StreamStreamMultiCallable value for the named stream-stream method.
+ """
+ raise NotImplementedError()
+
+
+########################## Service-Side Context ##############################
+
+
+class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
+ """A context object passed to method implementations."""
+
+ @abc.abstractmethod
+ def invocation_metadata(self):
+ """Accesses the metadata from the invocation-side of the RPC.
+
+ Returns:
+ The invocation metadata object as a sequence of pairs of bytes.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def peer(self):
+ """Identifies the peer that invoked the RPC being serviced.
+
+ Returns:
+ A string identifying the peer that invoked the RPC being serviced.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the initial metadata value to the invocation-side of the RPC.
+
+ This method need not be called by method implementations if they have no
+ service-side initial metadata to transmit.
+
+ Args:
+ initial_metadata: The initial metadata of the RPC as a sequence of pairs
+ of bytes.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def set_trailing_metadata(self, trailing_metadata):
+ """Accepts the trailing metadata value of the RPC.
+
+ This method need not be called by method implementations if they have no
+ service-side trailing metadata to transmit.
+
+ Args:
+ trailing_metadata: The trailing metadata of the RPC as a sequence of pairs
+ of bytes.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def set_code(self, code):
+ """Accepts the status code of the RPC.
+
+ This method need not be called by method implementations if they wish the
+ gRPC runtime to determine the status code of the RPC.
+
+ Args:
+ code: The integer status code of the RPC to be transmitted to the
+ invocation side of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def set_details(self, details):
+ """Accepts the service-side details of the RPC.
+
+ This method need not be called by method implementations if they have no
+ details to transmit.
+
+ Args:
+ details: The details bytes of the RPC to be transmitted to
+ the invocation side of the RPC.
+ """
+ raise NotImplementedError()
+
+
+##################### Service-Side Handler Interfaces ########################
+
+
+class RpcMethodHandler(six.with_metaclass(abc.ABCMeta)):
+ """An implementation of a single RPC method.
+
+ Attributes:
+ request_streaming: Whether the RPC supports exactly one request message or
+ any arbitrary number of request messages.
+ response_streaming: Whether the RPC supports exactly one response message or
+ any arbitrary number of response messages.
+ request_deserializer: A callable behavior that accepts a byte string and
+ returns an object suitable to be passed to this object's business logic,
+ or None to indicate that this object's business logic should be passed the
+ raw request bytes.
+ response_serializer: A callable behavior that accepts an object produced by
+ this object's business logic and returns a byte string, or None to
+ indicate that the byte strings produced by this object's business logic
+ should be transmitted on the wire as they are.
+ unary_unary: This object's application-specific business logic as a callable
+ value that takes a request value and a ServicerContext object and returns
+ a response value. Only non-None if both request_streaming and
+ response_streaming are False.
+ unary_stream: This object's application-specific business logic as a
+ callable value that takes a request value and a ServicerContext object and
+ returns an iterator of response values. Only non-None if request_streaming
+ is False and response_streaming is True.
+ stream_unary: This object's application-specific business logic as a
+ callable value that takes an iterator of request values and a
+ ServicerContext object and returns a response value. Only non-None if
+ request_streaming is True and response_streaming is False.
+ stream_stream: This object's application-specific business logic as a
+ callable value that takes an iterator of request values and a
+ ServicerContext object and returns an iterator of response values. Only
+ non-None if request_streaming and response_streaming are both True.
+ """
+
+
+class HandlerCallDetails(six.with_metaclass(abc.ABCMeta)):
+ """Describes an RPC that has just arrived for service.
+
+ Attributes:
+ method: The method name of the RPC.
+ invocation_metadata: The metadata from the invocation side of the RPC.
+ """
+
+
+class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)):
+ """An implementation of arbitrarily many RPC methods."""
+
+ @abc.abstractmethod
+ def service(self, handler_call_details):
+ """Services an RPC (or not).
+
+ Args:
+ handler_call_details: A HandlerCallDetails describing the RPC.
+
+ Returns:
+ An RpcMethodHandler with which the RPC may be serviced, or None to
+ indicate that this object will not be servicing the RPC.
+ """
+ raise NotImplementedError()
+
+
+############################# Server Interface ###############################
+
+
+class Server(six.with_metaclass(abc.ABCMeta)):
+ """Services RPCs."""
+
+ @abc.abstractmethod
+ def add_generic_rpc_handlers(self, generic_rpc_handlers):
+ """Registers GenericRpcHandlers with this Server.
+
+ This method is only safe to call before the server is started.
+
+ Args:
+ generic_rpc_handlers: An iterable of GenericRpcHandlers that will be used
+ to service RPCs after this Server is started.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_insecure_port(self, address):
+ """Reserves a port for insecure RPC service once this Server becomes active.
+
+ This method may only be called before calling this Server's start method is
+ called.
+
+ Args:
+ address: The address for which to open a port.
+
+ Returns:
+ An integer port on which RPCs will be serviced after this link has been
+ started. This is typically the same number as the port number contained
+ in the passed address, but will likely be different if the port number
+ contained in the passed address was zero.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def start(self):
+ """Starts this Server's service of RPCs.
+
+ This method may only be called while the server is not serving RPCs (i.e. it
+ is not idempotent).
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stop(self, grace):
+ """Stops this Server's service of RPCs.
+
+ All calls to this method immediately stop service of new RPCs. When existing
+ RPCs are aborted is controlled by the grace period parameter passed to this
+ method.
+
+ This method may be called at any time and is idempotent. Passing a smaller
+ grace value than has been passed in a previous call will have the effect of
+ stopping the Server sooner. Passing a larger grace value than has been
+ passed in a previous call will not have the effect of stopping the server
+ later.
+
+ Args:
+ grace: A duration of time in seconds to allow existing RPCs to complete
+ before being aborted by this Server's stopping. If None, this method
+ will block until the server is completely stopped.
+
+ Returns:
+ A threading.Event that will be set when this Server has completely
+ stopped. The returned event may not be set until after the full grace
+ period (if some ongoing RPC continues for the full length of the period)
+ of it may be set much sooner (such as if this Server had no RPCs underway
+ at the time it was stopped or if all RPCs that it had underway completed
+ very early in the grace period).
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index be24dc7cf0..9c067add0a 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -94,6 +94,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
+ 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
diff --git a/src/python/grpcio/tests/unit/framework/common/test_control.py b/src/python/grpcio/tests/unit/framework/common/test_control.py
index ca5ba3a854..088e2f8b88 100644
--- a/src/python/grpcio/tests/unit/framework/common/test_control.py
+++ b/src/python/grpcio/tests/unit/framework/common/test_control.py
@@ -60,10 +60,16 @@ class Control(six.with_metaclass(abc.ABCMeta)):
class PauseFailControl(Control):
- """A Control that can be used to pause or fail code under control."""
+ """A Control that can be used to pause or fail code under control.
+
+ This object is only safe for use from two threads: one of the system under
+ test calling control and the other from the test system calling pause,
+ block_until_paused, and fail.
+ """
def __init__(self):
self._condition = threading.Condition()
+ self._pause = False
self._paused = False
self._fail = False
@@ -72,19 +78,31 @@ class PauseFailControl(Control):
if self._fail:
raise Defect()
- while self._paused:
+ while self._pause:
+ self._paused = True
+ self._condition.notify_all()
self._condition.wait()
+ self._paused = False
@contextlib.contextmanager
def pause(self):
"""Pauses code under control while controlling code is in context."""
with self._condition:
- self._paused = True
+ self._pause = True
yield
with self._condition:
- self._paused = False
+ self._pause = False
self._condition.notify_all()
+ def block_until_paused(self):
+ """Blocks controlling code until code under control is paused.
+
+ May only be called within the context of a pause call.
+ """
+ with self._condition:
+ while not self._paused:
+ self._condition.wait()
+
@contextlib.contextmanager
def fail(self):
"""Fails code under control while controlling code is in context."""
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index c3fd59b3c2..386ef7d427 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -807,6 +807,7 @@ src/core/lib/http/parser.h \
src/core/lib/iomgr/closure.h \
src/core/lib/iomgr/endpoint.h \
src/core/lib/iomgr/endpoint_pair.h \
+src/core/lib/iomgr/ev_poll_and_epoll_posix.h \
src/core/lib/iomgr/ev_poll_posix.h \
src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/exec_ctx.h \
@@ -954,6 +955,7 @@ src/core/lib/iomgr/closure.c \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
+src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_poll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
diff --git a/tools/jenkins/run_full_performance.sh b/tools/jenkins/run_full_performance.sh
index bb1f79ff2c..3feda866f2 100755
--- a/tools/jenkins/run_full_performance.sh
+++ b/tools/jenkins/run_full_performance.sh
@@ -40,7 +40,7 @@ tools/run_tests/run_performance_tests.py \
--netperf \
--category all \
--bq_result_table performance_test.performance_experiment \
- --remote_worker_host grpc-performance-server-8core grpc-performance-client-8core \
+ --remote_worker_host grpc-performance-server-8core grpc-performance-client-8core grpc-performance-client2-8core \
|| EXIT_CODE=1
# scalability with 32cores (and upload to a different BQ table)
@@ -49,7 +49,7 @@ tools/run_tests/run_performance_tests.py \
--netperf \
--category scalable \
--bq_result_table performance_test.performance_experiment_32core \
- --remote_worker_host grpc-performance-server-32core grpc-performance-client-32core \
+ --remote_worker_host grpc-performance-server-32core grpc-performance-client-32core grpc-performance-client2-32core \
|| EXIT_CODE=1
exit $EXIT_CODE
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 0538dce419..0a5625c3f5 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -157,7 +157,7 @@ class CLanguage(object):
'windows': ['all'],
'mac': ['all'],
'posix': ['all'],
- 'linux': ['poll'],
+ 'linux': ['poll', 'legacy']
}
for target in binaries:
polling_strategies = (POLLING_STRATEGIES[self.platform]
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index 24d23fe28b..fe67426aa4 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -5530,6 +5530,7 @@
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
@@ -5629,6 +5630,8 @@
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
+ "src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.c",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.c",
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index e5379dc6a4..7a39070b00 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -316,6 +316,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\exec_ctx.h" />
@@ -483,6 +484,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_windows.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.c">
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index 95a5a73d26..ae70e0ada0 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -55,6 +55,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_windows.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@@ -674,6 +677,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
index 90ad80f2fc..09748f082c 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
@@ -304,6 +304,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\exec_ctx.h" />
@@ -449,6 +450,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_windows.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.c">
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
index 2b19c0fb34..a85bfeefe6 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -58,6 +58,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_windows.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@@ -572,6 +575,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>