aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-08-05 21:29:42 -0700
committerGravatar Julien Boeuf <jboeuf@google.com>2015-08-05 21:29:42 -0700
commitd3368e419253b5dc82662740031723f1909a97ba (patch)
tree38504ff129245fd643bef86bbabf0d11395f1e47 /src
parent7c8d255527cbec8b261300296d61361ce94e9d18 (diff)
parent7098b1ba645074c47675a731243aa929c67bb46f (diff)
Merge branch 'master' of github.com:grpc/grpc into server_creds_auth_md_processor
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc8
-rw-r--r--src/core/client_config/subchannel.c6
-rw-r--r--src/core/iomgr/fd_posix.c24
-rw-r--r--src/core/iomgr/fd_posix.h9
-rw-r--r--src/core/iomgr/pollset.h23
-rw-r--r--src/core/iomgr/pollset_kick_posix.c168
-rw-r--r--src/core/iomgr/pollset_kick_posix.h93
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c162
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c123
-rw-r--r--src/core/iomgr/pollset_posix.c145
-rw-r--r--src/core/iomgr/pollset_posix.h33
-rw-r--r--src/core/iomgr/pollset_windows.c83
-rw-r--r--src/core/iomgr/pollset_windows.h12
-rw-r--r--src/core/iomgr/sockaddr_utils.c5
-rw-r--r--src/core/iomgr/tcp_server_windows.c29
-rw-r--r--src/core/iomgr/wakeup_fd_eventfd.c8
-rw-r--r--src/core/iomgr/wakeup_fd_pipe.c12
-rw-r--r--src/core/iomgr/wakeup_fd_posix.c10
-rw-r--r--src/core/iomgr/wakeup_fd_posix.h20
-rw-r--r--src/core/security/google_default_credentials.c6
-rw-r--r--src/core/surface/channel.c4
-rw-r--r--src/core/surface/completion_queue.c70
-rw-r--r--src/core/surface/completion_queue.h2
-rw-r--r--src/core/surface/server.c73
-rw-r--r--src/core/surface/server_chttp2.c2
-rw-r--r--src/core/transport/chttp2/internal.h6
-rw-r--r--src/core/transport/chttp2/stream_lists.c3
-rw-r--r--src/core/transport/chttp2/writing.c34
-rw-r--r--src/core/transport/chttp2_transport.c8
-rw-r--r--src/cpp/client/channel.cc12
-rw-r--r--src/cpp/client/channel.h5
-rw-r--r--src/cpp/client/create_channel.cc4
-rw-r--r--src/cpp/client/insecure_credentials.cc2
-rw-r--r--src/cpp/client/secure_credentials.cc3
-rw-r--r--src/cpp/server/insecure_server_credentials.cc2
-rw-r--r--src/cpp/server/server_context.cc23
-rw-r--r--src/csharp/Grpc.Auth/OAuth2Interceptors.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/ChannelTest.cs91
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs34
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj1
-rw-r--r--src/csharp/Grpc.Core.Tests/NUnitVersionTest.cs4
-rw-r--r--src/csharp/Grpc.Core.Tests/ServerTest.cs39
-rw-r--r--src/csharp/Grpc.Core.Tests/TimeoutsTest.cs19
-rw-r--r--src/csharp/Grpc.Core/Channel.cs108
-rw-r--r--src/csharp/Grpc.Core/ChannelOptions.cs4
-rw-r--r--src/csharp/Grpc.Core/ChannelState.cs69
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj9
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.nuspec1
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs31
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/KeyCertificatePair.cs1
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs1
-rw-r--r--src/csharp/Grpc.Core/Server.cs175
-rw-r--r--src/csharp/Grpc.Core/ServerCredentials.cs28
-rw-r--r--src/csharp/Grpc.Core/ServerPort.cs120
-rw-r--r--src/csharp/Grpc.Core/ServerServiceDefinition.cs12
-rw-r--r--src/csharp/Grpc.Core/packages.config1
-rw-r--r--src/csharp/Grpc.Examples.MathServer/MathServer.cs15
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs20
-rw-r--r--src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs10
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs14
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropServer.cs10
-rw-r--r--src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs15
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestCredentials.cs1
-rw-r--r--src/csharp/Grpc.IntegrationTesting/packages.config1
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c25
-rw-r--r--src/node/ext/server.cc4
-rw-r--r--src/objective-c/generated_libraries/RemoteTestClient/RemoteTest.podspec5
-rw-r--r--src/objective-c/generated_libraries/RouteGuideClient/RouteGuide.podspec5
-rwxr-xr-xsrc/objective-c/tests/build_tests.sh37
-rwxr-xr-xsrc/objective-c/tests/run_tests.sh3
-rw-r--r--src/php/ext/grpc/server.c2
-rw-r--r--src/python/grpcio/.gitignore1
-rw-r--r--src/python/grpcio/commands.py26
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/server.c2
-rw-r--r--src/python/grpcio/setup.py2
-rw-r--r--src/python/grpcio_test/grpc_test/_links/_transmission_test.py12
-rw-r--r--src/python/grpcio_test/grpc_test/test_common.py71
-rw-r--r--src/ruby/ext/grpc/rb_server.c3
81 files changed, 1467 insertions, 811 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 75659947df..ea487bcd89 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -119,6 +119,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
"#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/status.h>\n"
"#include <grpc++/stream.h>\n"
+ "#include <grpc++/stub_options.h>\n"
"\n"
"namespace grpc {\n"
"class CompletionQueue;\n"
@@ -574,8 +575,8 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Print("};\n");
printer->Print(
"static std::unique_ptr<Stub> NewStub(const std::shared_ptr< "
- "::grpc::ChannelInterface>& "
- "channel);\n");
+ "::grpc::ChannelInterface>& channel, "
+ "const ::grpc::StubOptions& options = ::grpc::StubOptions());\n");
printer->Print("\n");
@@ -966,7 +967,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
printer->Print(
*vars,
"std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub("
- "const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n"
+ "const std::shared_ptr< ::grpc::ChannelInterface>& channel, "
+ "const ::grpc::StubOptions& options) {\n"
" std::unique_ptr< $ns$$Service$::Stub> stub(new "
"$ns$$Service$::Stub(channel));\n"
" return stub;\n"
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 17773bd2f4..ca52c75beb 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -322,8 +322,8 @@ static void continue_connect(grpc_subchannel *c) {
static void start_connect(grpc_subchannel *c) {
c->backoff_delta = gpr_time_from_seconds(
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
- c->next_attempt = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
+ c->next_attempt =
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
continue_connect(c);
}
@@ -511,8 +511,6 @@ static void publish_transport(grpc_subchannel *c) {
connection *destroy_connection = NULL;
grpc_channel_element *elem;
- gpr_log(GPR_DEBUG, "publish_transport: %p", c->master);
-
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index a2df838d4a..2d08a77a70 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -168,13 +168,19 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
+static void pollset_kick_locked(grpc_pollset *pollset) {
+ gpr_mu_lock(GRPC_POLLSET_MU(pollset));
+ grpc_pollset_kick(pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
+}
+
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset);
+ pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
} else if (fd->read_watcher) {
- grpc_pollset_force_kick(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher->pollset);
} else if (fd->write_watcher) {
- grpc_pollset_force_kick(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher->pollset);
}
}
@@ -188,13 +194,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- grpc_pollset_force_kick(watcher->pollset);
+ pollset_kick_locked(watcher->pollset);
}
if (fd->read_watcher) {
- grpc_pollset_force_kick(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher->pollset);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- grpc_pollset_force_kick(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher->pollset);
}
}
@@ -376,13 +382,15 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
return 0;
}
/* if there is nobody polling for read, but we need to, then start doing so */
- if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+ if (read_mask && !fd->read_watcher &&
+ (gpr_uintptr)gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
- if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+ if (write_mask && !fd->write_watcher &&
+ (gpr_uintptr)gpr_atm_acq_load(&fd->writest) > READY) {
fd->write_watcher = watcher;
mask |= write_mask;
}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 4e8e267ffd..835e9b339a 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -109,7 +109,8 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
on_done is called when the underlying file descriptor is definitely close()d.
If on_done is NULL, no callback will be made.
Requires: *fd initialized; no outstanding notify_on_read or
- notify_on_write. */
+ notify_on_write.
+ MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason);
@@ -122,11 +123,13 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
i.e. a combination of read_mask and write_mask determined by the fd's current
interest in said events.
Polling strategies that do not need to alter their behavior depending on the
- fd's current interest (such as epoll) do not need to call this function. */
+ fd's current interest (such as epoll) do not need to call this function.
+ MUST NOT be called with a pollset lock taken */
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *rec);
-/* Complete polling previously started with grpc_fd_begin_poll */
+/* Complete polling previously started with grpc_fd_begin_poll
+ MUST NOT be called with a pollset lock taken */
void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index c40188b3c9..c474e4dbf1 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -37,6 +37,8 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
+#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
+
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
- a server will typically keep a pollset containing all connected channels,
@@ -63,13 +65,24 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
descriptors.
Requires GRPC_POLLSET_MU(pollset) locked.
May unlock GRPC_POLLSET_MU(pollset) during its execution.
-
+
+ worker is a (platform-specific) handle that can be used to wake up
+ from grpc_pollset_work before any events are received and before the timeout
+ has expired. It is both initialized and destroyed by grpc_pollset_work.
+ Initialization of worker is guaranteed to occur BEFORE the
+ GRPC_POLLSET_MU(pollset) is released for the first time by
+ grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
+ not be released by grpc_pollset_work AFTER worker has been destroyed.
+
Returns true if some work has been done, and false if the deadline
- got attained. */
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
+ expired. */
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
- Requires GRPC_POLLSET_MU(pollset) locked. */
-void grpc_pollset_kick(grpc_pollset *pollset);
+ If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
+ Otherwise, if specific_worker is non-NULL, then kick that worker. */
+void grpc_pollset_kick(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */
diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c
deleted file mode 100644
index 51021784f2..0000000000
--- a/src/core/iomgr/pollset_kick_posix.c
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_kick_posix.h"
-
-#include <errno.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "src/core/iomgr/socket_utils_posix.h"
-#include "src/core/iomgr/wakeup_fd_posix.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-/* This implementation is based on a freelist of wakeup fds, with extra logic to
- * handle kicks while there is no attached fd. */
-
-/* TODO(klempner): Autosize this, and consider providing a way to disable the
- * cap entirely on systems with large fd limits */
-#define GRPC_MAX_CACHED_WFDS 50
-
-static grpc_kick_fd_info *fd_freelist = NULL;
-static int fd_freelist_count = 0;
-static gpr_mu fd_freelist_mu;
-
-static grpc_kick_fd_info *allocate_wfd(void) {
- grpc_kick_fd_info *info = NULL;
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist != NULL) {
- info = fd_freelist;
- fd_freelist = fd_freelist->next;
- --fd_freelist_count;
- }
- gpr_mu_unlock(&fd_freelist_mu);
- if (info == NULL) {
- info = gpr_malloc(sizeof(*info));
- grpc_wakeup_fd_create(&info->wakeup_fd);
- info->next = NULL;
- }
- return info;
-}
-
-static void destroy_wfd(grpc_kick_fd_info *wfd) {
- grpc_wakeup_fd_destroy(&wfd->wakeup_fd);
- gpr_free(wfd);
-}
-
-static void free_wfd(grpc_kick_fd_info *fd_info) {
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist_count < GRPC_MAX_CACHED_WFDS) {
- fd_info->next = fd_freelist;
- fd_freelist = fd_info;
- fd_freelist_count++;
- fd_info = NULL;
- }
- gpr_mu_unlock(&fd_freelist_mu);
-
- if (fd_info) {
- destroy_wfd(fd_info);
- }
-}
-
-void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) {
- gpr_mu_init(&kick_state->mu);
- kick_state->kicked = 0;
- kick_state->fd_list.next = kick_state->fd_list.prev = &kick_state->fd_list;
-}
-
-void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
- gpr_mu_destroy(&kick_state->mu);
- GPR_ASSERT(kick_state->fd_list.next == &kick_state->fd_list);
-}
-
-grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
- grpc_pollset_kick_state *kick_state) {
- grpc_kick_fd_info *fd_info;
- gpr_mu_lock(&kick_state->mu);
- if (kick_state->kicked) {
- kick_state->kicked = 0;
- gpr_mu_unlock(&kick_state->mu);
- return NULL;
- }
- fd_info = allocate_wfd();
- fd_info->next = &kick_state->fd_list;
- fd_info->prev = fd_info->next->prev;
- fd_info->next->prev = fd_info->prev->next = fd_info;
- gpr_mu_unlock(&kick_state->mu);
- return fd_info;
-}
-
-void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info) {
- grpc_wakeup_fd_consume_wakeup(&fd_info->wakeup_fd);
-}
-
-void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info) {
- gpr_mu_lock(&kick_state->mu);
- fd_info->next->prev = fd_info->prev;
- fd_info->prev->next = fd_info->next;
- free_wfd(fd_info);
- gpr_mu_unlock(&kick_state->mu);
-}
-
-void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
- gpr_mu_lock(&kick_state->mu);
- if (kick_state->fd_list.next != &kick_state->fd_list) {
- grpc_wakeup_fd_wakeup(&kick_state->fd_list.next->wakeup_fd);
- } else {
- kick_state->kicked = 1;
- }
- gpr_mu_unlock(&kick_state->mu);
-}
-
-void grpc_pollset_kick_global_init_fallback_fd(void) {
- gpr_mu_init(&fd_freelist_mu);
- grpc_wakeup_fd_global_init_force_fallback();
-}
-
-void grpc_pollset_kick_global_init(void) {
- gpr_mu_init(&fd_freelist_mu);
- grpc_wakeup_fd_global_init();
-}
-
-void grpc_pollset_kick_global_destroy(void) {
- while (fd_freelist != NULL) {
- grpc_kick_fd_info *current = fd_freelist;
- fd_freelist = fd_freelist->next;
- destroy_wfd(current);
- }
- grpc_wakeup_fd_global_destroy();
- gpr_mu_destroy(&fd_freelist_mu);
-}
-
-#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h
deleted file mode 100644
index 77e32a8d51..0000000000
--- a/src/core/iomgr/pollset_kick_posix.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H
-#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H
-
-#include "src/core/iomgr/wakeup_fd_posix.h"
-#include <grpc/support/sync.h>
-
-/* pollset kicking allows breaking a thread out of polling work for
- a given pollset.
- writing a byte to a pipe is used as a posix-ly portable base
- mechanism, and eventfds are utilized on Linux for better performance. */
-
-typedef struct grpc_kick_fd_info {
- grpc_wakeup_fd_info wakeup_fd;
- /* used for polling list and free list */
- struct grpc_kick_fd_info *next;
- /* only used when polling */
- struct grpc_kick_fd_info *prev;
-} grpc_kick_fd_info;
-
-typedef struct grpc_pollset_kick_state {
- gpr_mu mu;
- int kicked;
- struct grpc_kick_fd_info fd_list;
-} grpc_pollset_kick_state;
-
-#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) \
- GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd)
-
-/* This is an abstraction around the typical pipe mechanism for waking up a
- thread sitting in a poll() style call. */
-
-void grpc_pollset_kick_global_init(void);
-void grpc_pollset_kick_global_destroy(void);
-
-void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state);
-void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);
-
-/* Guarantees a pure posix implementation rather than a specialized one, if
- * applicable. Intended for testing. */
-void grpc_pollset_kick_global_init_fallback_fd(void);
-
-/* Must be called before entering poll(). If return value is NULL, this consumed
- an existing kick. Otherwise the return value is an FD to add to the poll set.
- */
-grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
- grpc_pollset_kick_state *kick_state);
-
-/* Consume an existing kick. Must be called after poll returns that the fd was
- readable, and before calling kick_post_poll. */
-void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info);
-
-/* Must be called after pre_poll, and after consume if applicable */
-void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info);
-
-/* Actually kick */
-void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state);
-
-#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index d697b59e4c..1320c64579 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -36,6 +36,7 @@
#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
#include <errno.h>
+#include <poll.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
@@ -44,23 +45,28 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+typedef struct wakeup_fd_hdl {
+ grpc_wakeup_fd wakeup_fd;
+ struct wakeup_fd_hdl *next;
+} wakeup_fd_hdl;
+
+typedef struct {
+ grpc_pollset *pollset;
+ grpc_fd *fd;
+ grpc_iomgr_closure closure;
+} delayed_add;
+
typedef struct {
int epoll_fd;
- grpc_wakeup_fd_info wakeup_fd;
+ wakeup_fd_hdl *free_wakeup_fds;
} pollset_hdr;
-static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
- grpc_fd *fd,
- int and_unlock_pollset) {
+static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
grpc_fd_watcher watcher;
- if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
- }
-
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
@@ -80,6 +86,52 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd_end_poll(&watcher, 0, 0);
}
+static void perform_delayed_add(void *arg, int iomgr_status) {
+ delayed_add *da = arg;
+ int do_shutdown_cb = 0;
+
+ if (!grpc_fd_is_orphaned(da->fd)) {
+ finally_add_fd(da->pollset, da->fd);
+ }
+
+ gpr_mu_lock(&da->pollset->mu);
+ da->pollset->in_flight_cbs--;
+ if (da->pollset->shutting_down) {
+ /* We don't care about this pollset anymore. */
+ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
+ GPR_ASSERT(!grpc_pollset_has_workers(da->pollset));
+ da->pollset->called_shutdown = 1;
+ do_shutdown_cb = 1;
+ }
+ }
+ gpr_mu_unlock(&da->pollset->mu);
+
+ GRPC_FD_UNREF(da->fd, "delayed_add");
+
+ if (do_shutdown_cb) {
+ da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg);
+ }
+
+ gpr_free(da);
+}
+
+static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
+ grpc_fd *fd,
+ int and_unlock_pollset) {
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ finally_add_fd(pollset, fd);
+ } else {
+ delayed_add *da = gpr_malloc(sizeof(*da));
+ da->pollset = pollset;
+ da->fd = fd;
+ GRPC_FD_REF(fd, "delayed_add");
+ grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
+ pollset->in_flight_cbs++;
+ grpc_iomgr_add_callback(&da->closure);
+ }
+}
+
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
@@ -103,12 +155,14 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
#define GRPC_EPOLL_MAX_EVENTS 1000
static void multipoll_with_epoll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+ grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
+ int poll_rv;
pollset_hdr *h = pollset->data.ptr;
int timeout_ms;
+ struct pollfd pfds[2];
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
@@ -116,43 +170,58 @@ static void multipoll_with_epoll_pollset_maybe_work(
* here.
*/
- pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
- do {
- ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
- if (ep_rv < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
- }
- } else {
- int i;
- for (i = 0; i < ep_rv; ++i) {
- if (ep_ev[i].data.ptr == 0) {
- grpc_wakeup_fd_consume_wakeup(&h->wakeup_fd);
- } else {
- grpc_fd *fd = ep_ev[i].data.ptr;
- /* TODO(klempner): We might want to consider making err and pri
- * separate events */
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write = ep_ev[i].events & EPOLLOUT;
- if (read || cancel) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = h->epoll_fd;
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+
+ poll_rv = poll(pfds, 2, timeout_ms);
+
+ if (poll_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ } else if (poll_rv == 0) {
+ /* do nothing */
+ } else {
+ if (pfds[0].revents) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
+ }
+ if (pfds[1].revents) {
+ do {
+ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
}
- if (write || cancel) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
+ } else {
+ int i;
+ for (i = 0; i < ep_rv; ++i) {
+ grpc_fd *fd = ep_ev[i].data.ptr;
+ /* TODO(klempner): We might want to consider making err and pri
+ * separate events */
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write = ep_ev[i].events & EPOLLOUT;
+ if (read || cancel) {
+ grpc_fd_become_readable(fd, allow_synchronous_callback);
+ }
+ if (write || cancel) {
+ grpc_fd_become_writable(fd, allow_synchronous_callback);
+ }
}
}
- }
+ } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
- timeout_ms = 0;
- } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+ }
gpr_mu_lock(&pollset->mu);
- pollset->counter -= 1;
}
static void multipoll_with_epoll_pollset_finish_shutdown(
@@ -160,21 +229,14 @@ static void multipoll_with_epoll_pollset_finish_shutdown(
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
- grpc_wakeup_fd_destroy(&h->wakeup_fd);
close(h->epoll_fd);
gpr_free(h);
}
-static void epoll_kick(grpc_pollset *pollset) {
- pollset_hdr *h = pollset->data.ptr;
- grpc_wakeup_fd_wakeup(&h->wakeup_fd);
-}
-
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
multipoll_with_epoll_pollset_add_fd,
multipoll_with_epoll_pollset_del_fd,
multipoll_with_epoll_pollset_maybe_work,
- epoll_kick,
multipoll_with_epoll_pollset_finish_shutdown,
multipoll_with_epoll_pollset_destroy};
@@ -182,8 +244,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
- struct epoll_event ev;
- int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
@@ -196,16 +256,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
for (i = 0; i < nfds; i++) {
multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0);
}
-
- grpc_wakeup_fd_create(&h->wakeup_fd);
- ev.events = EPOLLIN;
- ev.data.ptr = 0;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
- GRPC_WAKEUP_FD_GET_READ_FD(&h->wakeup_fd), &ev);
- if (err < 0) {
- gpr_log(GPR_ERROR, "Wakeup fd epoll_ctl failed: %s", strerror(errno));
- abort();
- }
}
grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 0084e83953..b5b2d7534d 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -53,12 +53,6 @@ typedef struct {
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
- /* fds being polled by the current poller: parallel arrays of pollfd, and
- a grpc_fd_watcher */
- size_t pfd_count;
- size_t pfd_capacity;
- grpc_fd_watcher *watchers;
- struct pollfd *pfds;
/* fds that have been removed from the pollset explicitly */
size_t del_count;
size_t del_capacity;
@@ -102,80 +96,60 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
}
}
-static void end_polling(grpc_pollset *pollset) {
- size_t i;
- pollset_hdr *h;
- h = pollset->data.ptr;
- for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN,
- h->pfds[i].revents & POLLOUT);
- }
-}
-
static void multipoll_with_poll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+ grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback) {
int timeout;
int r;
- size_t i, np, nf, nd;
+ size_t i, j, pfd_count, fd_count;
pollset_hdr *h;
- grpc_kick_fd_info *kfd;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
h = pollset->data.ptr;
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- if (h->pfd_capacity < h->fd_count + 1) {
- h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
- gpr_free(h->pfds);
- gpr_free(h->watchers);
- h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
- h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity);
- }
- nf = 0;
- np = 1;
- kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
- if (kfd == NULL) {
- /* Already kicked */
- return;
- }
- h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
- h->pfds[0].events = POLLIN;
- h->pfds[0].revents = POLLOUT;
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline case */
+ pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1));
+ watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1));
+ fd_count = 0;
+ pfd_count = 1;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = POLLOUT;
for (i = 0; i < h->fd_count; i++) {
int remove = grpc_fd_is_orphaned(h->fds[i]);
- for (nd = 0; nd < h->del_count; nd++) {
- if (h->fds[i] == h->dels[nd]) remove = 1;
+ for (j = 0; !remove && j < h->del_count; j++) {
+ if (h->fds[i] == h->dels[j]) remove = 1;
}
if (remove) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
} else {
- h->fds[nf++] = h->fds[i];
- h->watchers[np].fd = h->fds[i];
- h->pfds[np].fd = h->fds[i]->fd;
- h->pfds[np].revents = 0;
- np++;
+ h->fds[fd_count++] = h->fds[i];
+ watchers[pfd_count].fd = h->fds[i];
+ pfds[pfd_count].fd = h->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
}
}
- h->pfd_count = np;
- h->fd_count = nf;
- for (nd = 0; nd < h->del_count; nd++) {
- GRPC_FD_UNREF(h->dels[nd], "multipoller_del");
+ for (j = 0; j < h->del_count; j++) {
+ GRPC_FD_UNREF(h->dels[j], "multipoller_del");
}
h->del_count = 0;
- if (h->pfd_count == 0) {
- end_polling(pollset);
- return;
- }
- pollset->counter++;
+ h->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
- for (i = 1; i < np; i++) {
- h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN,
- POLLOUT, &h->watchers[i]);
+ for (i = 1; i < pfd_count; i++) {
+ pfds[i].events = grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
+ POLLOUT, &watchers[i]);
}
- r = poll(h->pfds, h->pfd_count, timeout);
+ r = poll(pfds, pfd_count, timeout);
- end_polling(pollset);
+ for (i = 1; i < pfd_count; i++) {
+ grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
+ pfds[i].revents & POLLOUT);
+ }
if (r < 0) {
if (errno != EINTR) {
@@ -184,35 +158,31 @@ static void multipoll_with_poll_pollset_maybe_work(
} else if (r == 0) {
/* do nothing */
} else {
- if (h->pfds[0].revents & POLLIN) {
- grpc_pollset_kick_consume(&pollset->kick_state, kfd);
+ if (pfds[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- for (i = 1; i < np; i++) {
- if (h->watchers[i].fd == NULL) {
+ for (i = 1; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
continue;
}
- if (h->pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback);
+ if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
+ grpc_fd_become_readable(watchers[i].fd, allow_synchronous_callback);
}
- if (h->pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback);
+ if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
+ grpc_fd_become_writable(watchers[i].fd, allow_synchronous_callback);
}
}
}
- grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
- gpr_mu_lock(&pollset->mu);
- pollset->counter--;
-}
+ gpr_free(pfds);
+ gpr_free(watchers);
-static void multipoll_with_poll_pollset_kick(grpc_pollset *p) {
- grpc_pollset_force_kick(p);
+ gpr_mu_lock(&pollset->mu);
}
static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
size_t i;
pollset_hdr *h = pollset->data.ptr;
- GPR_ASSERT(pollset->counter == 0);
for (i = 0; i < h->fd_count; i++) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
}
@@ -226,8 +196,6 @@ static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
multipoll_with_poll_pollset_finish_shutdown(pollset);
- gpr_free(h->pfds);
- gpr_free(h->watchers);
gpr_free(h->fds);
gpr_free(h->dels);
gpr_free(h);
@@ -237,7 +205,6 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = {
multipoll_with_poll_pollset_add_fd,
multipoll_with_poll_pollset_del_fd,
multipoll_with_poll_pollset_maybe_work,
- multipoll_with_poll_pollset_kick,
multipoll_with_poll_pollset_finish_shutdown,
multipoll_with_poll_pollset_destroy};
@@ -250,10 +217,6 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
h->fd_count = nfds;
h->fd_capacity = nfds;
h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
- h->pfd_count = 0;
- h->pfd_capacity = 0;
- h->pfds = NULL;
- h->watchers = NULL;
h->del_count = 0;
h->del_capacity = 0;
h->dels = NULL;
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index c8646af615..d3a9193af1 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -55,22 +55,60 @@
#include <grpc/support/useful.h>
GPR_TLS_DECL(g_current_thread_poller);
+GPR_TLS_DECL(g_current_thread_worker);
-void grpc_pollset_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p && p->counter) {
- p->vtable->kick(p);
- }
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+int grpc_pollset_has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
}
-void grpc_pollset_force_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
- grpc_pollset_kick_kick(&p->kick_state);
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (grpc_pollset_has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ } else {
+ return NULL;
}
}
-static void kick_using_pollset_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
- grpc_pollset_kick_kick(&p->kick_state);
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ }
+ p->kicked_without_pollers = 1;
+ } else if (gpr_tls_get(&g_current_thread_worker) !=
+ (gpr_intptr)specific_worker) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ }
+ } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ push_back_worker(p, specific_worker);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ } else {
+ p->kicked_without_pollers = 1;
+ }
}
}
@@ -78,16 +116,12 @@ static void kick_using_pollset_kick(grpc_pollset *p) {
void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
-
- /* Initialize kick fd state */
- grpc_pollset_kick_global_init();
+ grpc_wakeup_fd_global_init();
}
void grpc_pollset_global_shutdown(void) {
- /* destroy the kick pipes */
- grpc_pollset_kick_global_destroy();
-
gpr_tls_destroy(&g_current_thread_poller);
+ grpc_wakeup_fd_global_destroy();
}
/* main interface */
@@ -96,7 +130,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
- grpc_pollset_kick_init(&pollset->kick_state);
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
@@ -134,27 +168,44 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ int added_worker = 0;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
+ /* this must happen before we (potentially) drop pollset->mu */
+ worker->next = worker->prev = NULL;
+ /* TODO(ctiller): pool these */
+ grpc_wakeup_fd_init(&worker->wakeup_fd);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
- return 1;
+ goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
- return 1;
+ goto done;
}
if (pollset->shutting_down) {
- return 1;
+ goto done;
+ }
+ if (!pollset->kicked_without_pollers) {
+ push_front_worker(pollset, worker);
+ added_worker = 1;
+ gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
+ pollset->vtable->maybe_work(pollset, worker, deadline, now, 1);
+ gpr_tls_set(&g_current_thread_poller, 0);
+ } else {
+ pollset->kicked_without_pollers = 0;
+ }
+done:
+ grpc_wakeup_fd_destroy(&worker->wakeup_fd);
+ if (added_worker) {
+ remove_worker(pollset, worker);
}
- gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- pollset->vtable->maybe_work(pollset, deadline, now, 1);
- gpr_tls_set(&g_current_thread_poller, 0);
if (pollset->shutting_down) {
- if (pollset->counter > 0) {
- grpc_pollset_kick(pollset);
+ if (grpc_pollset_has_workers(pollset)) {
+ grpc_pollset_kick(pollset, NULL);
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
@@ -177,15 +228,13 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
- pollset->counter == 0) {
+ !grpc_pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
call_shutdown = 1;
}
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
- if (pollset->counter > 0) {
- grpc_pollset_kick(pollset);
- }
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
if (call_shutdown) {
@@ -196,8 +245,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->vtable->destroy(pollset);
- grpc_pollset_kick_destroy(&pollset->kick_state);
gpr_mu_destroy(&pollset->mu);
}
@@ -248,8 +297,8 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
- if (pollset->counter != 0) {
- grpc_pollset_kick(pollset);
+ if (grpc_pollset_has_workers(pollset)) {
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
grpc_iomgr_add_callback(&up_args->promotion_closure);
gpr_mu_unlock(&pollset->mu);
return;
@@ -264,7 +313,8 @@ static void basic_do_promote(void *args, int success) {
pollset->in_flight_cbs--;
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
- if (pollset->in_flight_cbs == 0 && pollset->counter == 0 && !pollset->called_shutdown) {
+ if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->called_shutdown = 1;
do_shutdown_cb = 1;
}
@@ -307,7 +357,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) goto exit;
- if (!pollset->counter) {
+ if (!grpc_pollset_has_workers(pollset)) {
/* Fast path -- no in flight cbs */
/* TODO(klempner): Comment this out and fix any test failures or establish
* they are due to timing issues */
@@ -343,7 +393,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->promotion_closure.cb_arg = up_args;
grpc_iomgr_add_callback(&up_args->promotion_closure);
- grpc_pollset_kick(pollset);
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
if (and_unlock_pollset) {
@@ -365,12 +415,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
}
static void basic_pollset_maybe_work(grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
struct pollfd pfd[2];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
- grpc_kick_fd_info *kfd;
int timeout;
int r;
int nfds;
@@ -387,16 +437,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
fd = pollset->data.ptr = NULL;
}
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
- if (kfd == NULL) {
- /* Already kicked */
- return;
- }
- pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
+ pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
nfds = 1;
- pollset->counter++;
if (fd) {
pfd[1].fd = fd->fd;
pfd[1].revents = 0;
@@ -428,7 +472,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
- grpc_pollset_kick_consume(&pollset->kick_state, kfd);
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
if (nfds > 1) {
if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
@@ -440,14 +484,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
}
}
- grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
-
gpr_mu_lock(&pollset->mu);
- pollset->counter--;
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->counter == 0);
if (pollset->data.ptr != NULL) {
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
pollset->data.ptr = NULL;
@@ -455,14 +495,13 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable basic_pollset = {
- basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
- kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy};
+ basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
+ basic_pollset_destroy, basic_pollset_destroy};
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
pollset->vtable = &basic_pollset;
- pollset->counter = 0;
pollset->data.ptr = fd_or_null;
- if (fd_or_null) {
+ if (fd_or_null != NULL) {
GRPC_FD_REF(fd_or_null, "basicpoll");
}
}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 37de1276d1..1c1b736193 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -35,8 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#include <grpc/support/sync.h>
-
-#include "src/core/iomgr/pollset_kick_posix.h"
+#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@@ -45,6 +44,12 @@ typedef struct grpc_pollset_vtable grpc_pollset_vtable;
use the struct tag */
struct grpc_fd;
+typedef struct grpc_pollset_worker {
+ grpc_wakeup_fd wakeup_fd;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+} grpc_pollset_worker;
+
typedef struct grpc_pollset {
/* pollsets under posix can mutate representation as fds are added and
removed.
@@ -52,11 +57,11 @@ typedef struct grpc_pollset {
few fds, and an epoll() based implementation for many fds */
const grpc_pollset_vtable *vtable;
gpr_mu mu;
- grpc_pollset_kick_state kick_state;
- int counter;
+ grpc_pollset_worker root_worker;
int in_flight_cbs;
int shutting_down;
int called_shutdown;
+ int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
union {
@@ -70,9 +75,9 @@ struct grpc_pollset_vtable {
int and_unlock_pollset);
void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd,
int and_unlock_pollset);
- void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback);
- void (*kick)(grpc_pollset *pollset);
+ void (*maybe_work)(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now,
+ int allow_synchronous_callback);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
};
@@ -85,22 +90,16 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
poll after an fd is orphaned) */
void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd);
-/* Force any current pollers to break polling: it's the callers responsibility
- to ensure that the pollset indeed needs to be kicked - no verification that
- the pollset is actually performing polling work is done. At worst this will
- result in spurious wakeups if performed at the wrong moment.
- Does not touch pollset->mu. */
-void grpc_pollset_force_kick(grpc_pollset *pollset);
/* Returns the fd to listen on for kicks */
int grpc_kick_read_fd(grpc_pollset *p);
/* Call after polling has been kicked to leave the kicked state */
void grpc_kick_drain(grpc_pollset *p);
/* Convert a timespec to milliseconds:
- - very small or negative poll times are clamped to zero to do a
+ - very small or negative poll times are clamped to zero to do a
non-blocking poll (which becomes spin polling)
- other small values are rounded up to one millisecond
- - longer than a millisecond polls are rounded up to the next nearest
+ - longer than a millisecond polls are rounded up to the next nearest
millisecond to avoid spinning
- infinite timeouts are converted to -1 */
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now);
@@ -114,4 +113,8 @@ extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller;
void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
size_t fd_count);
+/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must
+ * be locked) */
+int grpc_pollset_has_workers(grpc_pollset *pollset);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index a9c4739c7c..22dc5891c3 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -42,6 +42,38 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+static int has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
+}
+
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ }
+ else {
+ return NULL;
+ }
+}
+
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
@@ -50,7 +82,8 @@
void grpc_pollset_init(grpc_pollset *pollset) {
memset(pollset, 0, sizeof(*pollset));
gpr_mu_init(&pollset->mu);
- gpr_cv_init(&pollset->cv);
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
+ pollset->kicked_without_pollers = 0;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
@@ -58,34 +91,66 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void *shutdown_done_arg) {
gpr_mu_lock(&pollset->mu);
pollset->shutting_down = 1;
- gpr_cv_broadcast(&pollset->cv);
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
shutdown_done(shutdown_done_arg);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
- gpr_cv_destroy(&pollset->cv);
}
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) {
gpr_timespec now;
+ int added_worker = 0;
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
+ worker->next = worker->prev = NULL;
+ gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
- return 1 /* GPR_TRUE */;
+ goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
- return 1 /* GPR_TRUE */;
+ goto done;
}
- if (!pollset->shutting_down) {
- gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
+ if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
+ push_front_worker(pollset, worker);
+ added_worker = 1;
+ gpr_cv_wait(&worker->cv, &pollset->mu, deadline);
+ } else {
+ pollset->kicked_without_pollers = 0;
+ }
+done:
+ gpr_cv_destroy(&worker->cv);
+ if (added_worker) {
+ remove_worker(pollset, worker);
}
return 1 /* GPR_TRUE */;
}
-void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); }
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ gpr_cv_signal(&specific_worker->cv);
+ }
+ p->kicked_without_pollers = 1;
+ } else {
+ gpr_cv_signal(&specific_worker->cv);
+ }
+ } else {
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ push_back_worker(p, specific_worker);
+ gpr_cv_signal(&specific_worker->cv);
+ } else {
+ p->kicked_without_pollers = 1;
+ }
+ }
+}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index c9b8d3f374..4efa5a1717 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -40,12 +40,20 @@
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. A Windows "pollset" is merely a mutex
- and a condition variable, used to synchronize with the IOCP. */
+ used to synchronize with the IOCP, and workers are condition variables
+ used to block threads until work is ready. */
+
+typedef struct grpc_pollset_worker {
+ gpr_cv cv;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+} grpc_pollset_worker;
typedef struct grpc_pollset {
gpr_mu mu;
- gpr_cv cv;
int shutting_down;
+ int kicked_without_pollers;
+ grpc_pollset_worker root_worker;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c
index 71ac12e87b..65ec1f94ac 100644
--- a/src/core/iomgr/sockaddr_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -170,6 +170,11 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
char *grpc_sockaddr_to_uri(const struct sockaddr *addr) {
char *temp;
char *result;
+ struct sockaddr_in addr_normalized;
+
+ if (grpc_sockaddr_is_v4mapped(addr, &addr_normalized)) {
+ addr = (const struct sockaddr *)&addr_normalized;
+ }
switch (addr->sa_family) {
case AF_INET:
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index bcd2aa8536..0adbe9507c 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -186,6 +186,17 @@ error:
return -1;
}
+static void decrement_active_ports_and_notify(server_port *sp) {
+ sp->shutting_down = 0;
+ sp->socket->read_info.outstanding = 0;
+ gpr_mu_lock(&sp->server->mu);
+ GPR_ASSERT(sp->server->active_ports > 0);
+ if (0 == --sp->server->active_ports) {
+ gpr_cv_broadcast(&sp->server->cv);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+}
+
/* start_accept will reference that for the IOCP notification request. */
static void on_accept(void *arg, int from_iocp);
@@ -234,6 +245,15 @@ static void start_accept(server_port *port) {
return;
failure:
+ if (port->shutting_down) {
+ /* We are abandoning the listener port, take that into account to prevent
+ occasional hangs on shutdown. The hang happens when sp->shutting_down
+ change is not seen by on_accept and we proceed to trying new accept,
+ but we fail there because the listening port has been closed in the
+ meantime. */
+ decrement_active_ports_and_notify(port);
+ return;
+ }
utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message);
@@ -277,14 +297,7 @@ static void on_accept(void *arg, int from_iocp) {
if (sp->shutting_down) {
/* During the shutdown case, we ARE expecting an error. So that's well,
and we can wake up the shutdown thread. */
- sp->shutting_down = 0;
- sp->socket->read_info.outstanding = 0;
- gpr_mu_lock(&sp->server->mu);
- GPR_ASSERT(sp->server->active_ports > 0);
- if (0 == --sp->server->active_ports) {
- gpr_cv_broadcast(&sp->server->cv);
- }
- gpr_mu_unlock(&sp->server->mu);
+ decrement_active_ports_and_notify(sp);
return;
} else {
char *utf8_message = gpr_format_message(WSAGetLastError());
diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c
index 99c32bb9db..52912235f8 100644
--- a/src/core/iomgr/wakeup_fd_eventfd.c
+++ b/src/core/iomgr/wakeup_fd_eventfd.c
@@ -42,7 +42,7 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/log.h>
-static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_create(grpc_wakeup_fd *fd_info) {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
/* TODO(klempner): Handle failure more gracefully */
GPR_ASSERT(efd >= 0);
@@ -50,7 +50,7 @@ static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
fd_info->write_fd = -1;
}
-static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_consume(grpc_wakeup_fd *fd_info) {
eventfd_t value;
int err;
do {
@@ -58,14 +58,14 @@ static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
} while (err < 0 && errno == EINTR);
}
-static void eventfd_wakeup(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_wakeup(grpc_wakeup_fd *fd_info) {
int err;
do {
err = eventfd_write(fd_info->read_fd, 1);
} while (err < 0 && errno == EINTR);
}
-static void eventfd_destroy(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
}
diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c
index f895478990..9fc4ee2388 100644
--- a/src/core/iomgr/wakeup_fd_pipe.c
+++ b/src/core/iomgr/wakeup_fd_pipe.c
@@ -44,7 +44,7 @@
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/log.h>
-static void pipe_create(grpc_wakeup_fd_info *fd_info) {
+static void pipe_init(grpc_wakeup_fd *fd_info) {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
@@ -54,7 +54,7 @@ static void pipe_create(grpc_wakeup_fd_info *fd_info) {
fd_info->write_fd = pipefd[1];
}
-static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
+static void pipe_consume(grpc_wakeup_fd *fd_info) {
char buf[128];
int r;
@@ -74,13 +74,13 @@ static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
}
}
-static void pipe_wakeup(grpc_wakeup_fd_info *fd_info) {
+static void pipe_wakeup(grpc_wakeup_fd *fd_info) {
char c = 0;
while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR)
;
}
-static void pipe_destroy(grpc_wakeup_fd_info *fd_info) {
+static void pipe_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
close(fd_info->write_fd);
}
@@ -91,7 +91,7 @@ static int pipe_check_availability(void) {
}
const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
- pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability
-};
+ pipe_init, pipe_consume, pipe_wakeup, pipe_destroy,
+ pipe_check_availability};
#endif /* GPR_POSIX_WAKUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c
index d3cc3ec570..e48f5223fa 100644
--- a/src/core/iomgr/wakeup_fd_posix.c
+++ b/src/core/iomgr/wakeup_fd_posix.c
@@ -57,19 +57,19 @@ void grpc_wakeup_fd_global_destroy(void) {
wakeup_fd_vtable = NULL;
}
-void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info) {
- wakeup_fd_vtable->create(fd_info);
+void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
+ wakeup_fd_vtable->init(fd_info);
}
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->consume(fd_info);
}
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->wakeup(fd_info);
}
-void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->destroy(fd_info);
}
diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h
index 1b0ff70c7f..a4da4df51f 100644
--- a/src/core/iomgr/wakeup_fd_posix.h
+++ b/src/core/iomgr/wakeup_fd_posix.h
@@ -69,28 +69,28 @@ void grpc_wakeup_fd_global_destroy(void);
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
-typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
+typedef struct grpc_wakeup_fd grpc_wakeup_fd;
typedef struct grpc_wakeup_fd_vtable {
- void (*create)(grpc_wakeup_fd_info *fd_info);
- void (*consume)(grpc_wakeup_fd_info *fd_info);
- void (*wakeup)(grpc_wakeup_fd_info *fd_info);
- void (*destroy)(grpc_wakeup_fd_info *fd_info);
+ void (*init)(grpc_wakeup_fd *fd_info);
+ void (*consume)(grpc_wakeup_fd *fd_info);
+ void (*wakeup)(grpc_wakeup_fd *fd_info);
+ void (*destroy)(grpc_wakeup_fd *fd_info);
/* Must be called before calling any other functions */
int (*check_availability)(void);
} grpc_wakeup_fd_vtable;
-struct grpc_wakeup_fd_info {
+struct grpc_wakeup_fd {
int read_fd;
int write_fd;
};
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
-void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
+void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info);
/* Defined in some specialized implementation's .c file, or by
* wakeup_fd_nospecial.c if no such implementation exists. */
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index de1929fe76..f368819597 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -80,7 +80,7 @@ static void on_compute_engine_detection_http_response(
}
gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset));
detector->is_done = 1;
- grpc_pollset_kick(&detector->pollset);
+ grpc_pollset_kick(&detector->pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
@@ -112,7 +112,9 @@ static int is_stack_running_on_compute_engine(void) {
called once for the lifetime of the process by the default credentials. */
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
- grpc_pollset_work(&detector.pollset, gpr_inf_future(GPR_CLOCK_REALTIME));
+ grpc_pollset_worker worker;
+ grpc_pollset_work(&detector.pollset, &worker,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 81f673f856..688a586e18 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -229,7 +229,9 @@ static void destroy_channel(void *p, int ok) {
registered_call *rc = channel->registered_calls;
channel->registered_calls = rc->next;
GRPC_MDELEM_UNREF(rc->path);
- GRPC_MDELEM_UNREF(rc->authority);
+ if (rc->authority) {
+ GRPC_MDELEM_UNREF(rc->authority);
+ }
gpr_free(rc);
}
grpc_mdctx_unref(channel->metadata_context);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 3f60b0b0ba..00429fac19 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -45,6 +45,11 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+typedef struct {
+ grpc_pollset_worker *worker;
+ void *tag;
+} plucker;
+
/* Completion queue structure */
struct grpc_completion_queue {
/** completed events */
@@ -60,6 +65,8 @@ struct grpc_completion_queue {
int shutdown;
int shutdown_called;
int is_server_cq;
+ int num_pluckers;
+ plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
};
grpc_completion_queue *grpc_completion_queue_create(void) {
@@ -117,6 +124,8 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
int shutdown;
+ int i;
+ grpc_pollset_worker *pluck_worker;
storage->tag = tag;
storage->done = done;
@@ -130,7 +139,14 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
- grpc_pollset_kick(&cc->pollset);
+ pluck_worker = NULL;
+ for (i = 0; i < cc->num_pluckers; i++) {
+ if (cc->pluckers[i].tag == tag) {
+ pluck_worker = cc->pluckers[i].worker;
+ break;
+ }
+ }
+ grpc_pollset_kick(&cc->pollset, pluck_worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
cc->completed_tail->next =
@@ -147,6 +163,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
grpc_event ret;
+ grpc_pollset_worker worker;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -172,7 +189,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!grpc_pollset_work(&cc->pollset, deadline)) {
+ if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
@@ -184,11 +201,37 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
return ret;
}
+static int add_plucker(grpc_completion_queue *cc, void *tag,
+ grpc_pollset_worker *worker) {
+ if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
+ return 0;
+ }
+ cc->pluckers[cc->num_pluckers].tag = tag;
+ cc->pluckers[cc->num_pluckers].worker = worker;
+ cc->num_pluckers++;
+ return 1;
+}
+
+static void del_plucker(grpc_completion_queue *cc, void *tag,
+ grpc_pollset_worker *worker) {
+ int i;
+ for (i = 0; i < cc->num_pluckers; i++) {
+ if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
+ cc->num_pluckers--;
+ GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
+ return;
+ }
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
+ grpc_pollset_worker worker;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -219,12 +262,24 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!grpc_pollset_work(&cc->pollset, deadline)) {
+ if (!add_plucker(cc, tag, &worker)) {
+ gpr_log(GPR_DEBUG,
+ "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d",
+ GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
+ /* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
+ if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+ del_plucker(cc, tag, &worker);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_TIMEOUT;
+ break;
+ }
+ del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
@@ -261,15 +316,6 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return &cc->pollset;
}
-void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_kick(&cc->pollset);
- grpc_pollset_work(&cc->pollset,
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(100, GPR_TIMESPAN)));
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index f944f48d8e..8de024aaea 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -77,8 +77,6 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
-void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
-
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index c370a9b8ab..29d893db71 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -327,6 +327,14 @@ static void request_matcher_zombify_all_pending_calls(
}
}
+static void request_matcher_kill_requests(grpc_server *server,
+ request_matcher *rm) {
+ int request_id;
+ while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
+ fail_call(server, &server->requested_calls[request_id]);
+ }
+}
+
/*
* server proper
*/
@@ -492,12 +500,25 @@ static int num_channels(grpc_server *server) {
return n;
}
+static void kill_pending_work_locked(grpc_server *server) {
+ registered_method *rm;
+ request_matcher_kill_requests(server, &server->unregistered_request_matcher);
+ request_matcher_zombify_all_pending_calls(
+ &server->unregistered_request_matcher);
+ for (rm = server->registered_methods; rm; rm = rm->next) {
+ request_matcher_kill_requests(server, &rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher);
+ }
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
+ kill_pending_work_locked(server);
+
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
@@ -947,52 +968,15 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
op.set_accept_stream_user_data = chand;
op.on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state;
+ op.disconnect = gpr_atm_acq_load(&s->shutdown_flag);
grpc_transport_perform_op(transport, &op);
}
-typedef struct {
- requested_call **requests;
- size_t count;
- size_t capacity;
-} request_killer;
-
-static void request_killer_init(request_killer *rk) {
- memset(rk, 0, sizeof(*rk));
-}
-
-static void request_killer_add(request_killer *rk, requested_call *rc) {
- if (rk->capacity == rk->count) {
- rk->capacity = GPR_MAX(8, rk->capacity * 2);
- rk->requests =
- gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests));
- }
- rk->requests[rk->count++] = rc;
-}
-
-static void request_killer_add_request_matcher(request_killer *rk,
- grpc_server *server,
- request_matcher *rm) {
- int request_id;
- while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
- request_killer_add(rk, &server->requested_calls[request_id]);
- }
-}
-
-static void request_killer_run(request_killer *rk, grpc_server *server) {
- size_t i;
- for (i = 0; i < rk->count; i++) {
- fail_call(server, rk->requests[i]);
- }
- gpr_free(rk->requests);
-}
-
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
- request_killer reqkill;
GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
@@ -1013,27 +997,16 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
channel_broadcaster_init(server, &broadcaster);
- request_killer_init(&reqkill);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- request_killer_add_request_matcher(&reqkill, server,
- &server->unregistered_request_matcher);
- request_matcher_zombify_all_pending_calls(
- &server->unregistered_request_matcher);
- for (rm = server->registered_methods; rm; rm = rm->next) {
- request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(&rm->request_matcher);
- }
+ kill_pending_work_locked(server);
gpr_mu_unlock(&server->mu_call);
gpr_atm_rel_store(&server->shutdown_flag, 1);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
- /* terminate all the requested calls */
- request_killer_run(&reqkill, server);
-
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 78c53466b3..4ab845bc00 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -80,7 +80,7 @@ static void destroy(grpc_server *server, void *tcpp) {
grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server);
}
-int grpc_server_add_http2_port(grpc_server *server, const char *addr) {
+int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
grpc_resolved_addresses *resolved = NULL;
grpc_tcp_server *tcp = NULL;
size_t i;
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 74b3a591d5..42cf0ecd5b 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -119,6 +119,10 @@ typedef enum {
GRPC_WRITE_STATE_SENT_CLOSE
} grpc_chttp2_write_state;
+/* flags that can be or'd into stream_global::writing_now */
+#define GRPC_CHTTP2_WRITING_DATA 1
+#define GRPC_CHTTP2_WRITING_WINDOW 2
+
typedef enum {
GRPC_DONT_SEND_CLOSED = 0,
GRPC_SEND_CLOSED,
@@ -382,7 +386,7 @@ typedef struct {
gpr_uint8 published_cancelled;
/** is this stream in the stream map? (boolean) */
gpr_uint8 in_stream_map;
- /** is this stream actively being written? */
+ /** bitmask of GRPC_CHTTP2_WRITING_xxx above */
gpr_uint8 writing_now;
/** has anything been written to this stream? */
gpr_uint8 written_anything;
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 9e68c1e146..9c3ad7a777 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -164,9 +164,6 @@ void grpc_chttp2_list_add_first_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
- gpr_log(GPR_DEBUG, "add:%d:%d:%d:%d", stream_global->id,
- stream_global->write_state, stream_global->in_stream_map,
- stream_global->read_closed);
stream_list_add_head(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index d39b0c42f7..b55e81fdca 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -77,7 +77,6 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->id = stream_global->id;
stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
- GPR_ASSERT(!stream_global->writing_now);
if (stream_global->outgoing_sopb) {
window_delta =
@@ -123,11 +122,13 @@ int grpc_chttp2_unlocking_check_writes(
stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
- stream_global->writing_now = 1;
- grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
- } else if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- stream_global->writing_now = 1;
+ stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW;
+ }
+ if (stream_writing->sopb.nops > 0 ||
+ stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
+ stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA;
+ }
+ if (stream_global->writing_now != 0) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
}
@@ -183,6 +184,7 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
stream_writing->id, &transport_writing->hpack_compressor,
&transport_writing->outbuf);
+ stream_writing->sopb.nops = 0;
}
if (stream_writing->announce_window > 0) {
gpr_slice_buffer_add(
@@ -191,7 +193,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
stream_writing->id, stream_writing->announce_window));
stream_writing->announce_window = 0;
}
- stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(stream_writing->id,
@@ -215,20 +216,23 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- GPR_ASSERT(stream_global->writing_now);
- stream_global->writing_now = 0;
- if (stream_global->outgoing_sopb != NULL &&
- stream_global->outgoing_sopb->nops == 0) {
- stream_global->outgoing_sopb = NULL;
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->send_done_closure, 1);
- }
+ GPR_ASSERT(stream_global->writing_now != 0);
if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {
stream_global->read_closed = 1;
}
}
+ if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) {
+ if (stream_global->outgoing_sopb != NULL &&
+ stream_global->outgoing_sopb->nops == 0) {
+ GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global,
+ stream_global->send_done_closure, 1);
+ }
+ }
+ stream_global->writing_now = 0;
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index c8c4207208..a9f91b64d5 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -834,6 +834,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
stream_global);
} else {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
+ if (stream_global->outgoing_sopb != NULL) {
+ grpc_sopb_reset(stream_global->outgoing_sopb);
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global,
+ stream_global->send_done_closure, 1);
+ }
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
char buffer[GPR_LTOA_MIN_BUFSIZE];
@@ -860,7 +866,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (!stream_global->publish_sopb) {
continue;
}
- if (stream_global->writing_now) {
+ if (stream_global->writing_now != 0) {
continue;
}
/* FIXME(ctiller): we include in_stream_map in our computation of
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 5df81e641e..ee143d68a0 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -51,13 +51,16 @@
namespace grpc {
-Channel::Channel(const grpc::string& target, grpc_channel* channel)
- : target_(target), c_channel_(channel) {}
+Channel::Channel(grpc_channel* channel) : c_channel_(channel) {}
+
+Channel::Channel(const grpc::string& host, grpc_channel* channel)
+ : host_(host), c_channel_(channel) {}
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
+ const char* host_str = host_.empty() ? NULL : host_.c_str();
auto c_call =
method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(c_channel_, cq->cq(),
@@ -65,7 +68,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
context->raw_deadline())
: grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
context->authority().empty()
- ? target_.c_str()
+ ? host_str
: context->authority().c_str(),
context->raw_deadline());
grpc_census_call_set_context(c_call, context->census_context());
@@ -86,7 +89,8 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
}
void* Channel::RegisterMethod(const char* method) {
- return grpc_channel_register_call(c_channel_, method, target_.c_str());
+ return grpc_channel_register_call(c_channel_, method,
+ host_.empty() ? NULL : host_.c_str());
}
} // namespace grpc
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index 9108713c58..8660146856 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -52,7 +52,8 @@ class StreamContextInterface;
class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
public:
- Channel(const grpc::string& target, grpc_channel* c_channel);
+ explicit Channel(grpc_channel* c_channel);
+ Channel(const grpc::string& host, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
virtual void* RegisterMethod(const char* method) GRPC_OVERRIDE;
@@ -62,7 +63,7 @@ class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
Call* call) GRPC_OVERRIDE;
private:
- const grpc::string target_;
+ const grpc::string host_;
grpc_channel* const c_channel_; // owned
};
diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc
index dbe2694a78..21d01b739d 100644
--- a/src/cpp/client/create_channel.cc
+++ b/src/cpp/client/create_channel.cc
@@ -51,7 +51,7 @@ std::shared_ptr<ChannelInterface> CreateChannel(
cp_args.SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING,
user_agent_prefix.str());
return creds ? creds->CreateChannel(target, cp_args)
- : std::shared_ptr<ChannelInterface>(new Channel(
- target, grpc_lame_client_channel_create(NULL)));
+ : std::shared_ptr<ChannelInterface>(
+ new Channel(grpc_lame_client_channel_create(NULL)));
}
} // namespace grpc
diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc
index e802fa8034..d8dcaa1436 100644
--- a/src/cpp/client/insecure_credentials.cc
+++ b/src/cpp/client/insecure_credentials.cc
@@ -49,7 +49,7 @@ class InsecureCredentialsImpl GRPC_FINAL : public Credentials {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
- target, grpc_insecure_channel_create(target.c_str(), &channel_args)));
+ grpc_insecure_channel_create(target.c_str(), &channel_args)));
}
// InsecureCredentials should not be applied to a call.
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index abf0cb387e..2d6114e06b 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -44,8 +44,7 @@ std::shared_ptr<grpc::ChannelInterface> SecureCredentials::CreateChannel(
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
- args.GetSslTargetNameOverride().empty() ? target
- : args.GetSslTargetNameOverride(),
+ args.GetSslTargetNameOverride(),
grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args)));
}
diff --git a/src/cpp/server/insecure_server_credentials.cc b/src/cpp/server/insecure_server_credentials.cc
index aca3568e59..800cd36caa 100644
--- a/src/cpp/server/insecure_server_credentials.cc
+++ b/src/cpp/server/insecure_server_credentials.cc
@@ -41,7 +41,7 @@ class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials {
public:
int AddPortToServer(const grpc::string& addr,
grpc_server* server) GRPC_OVERRIDE {
- return grpc_server_add_http2_port(server, addr.c_str());
+ return grpc_server_add_insecure_http2_port(server, addr.c_str());
}
};
} // namespace
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index cf19556e7a..04373397f9 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -50,16 +50,23 @@ namespace grpc {
class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
+ CompletionOp() : has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), cancelled_(0) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
+ void set_tag(void* tag) {
+ has_tag_ = true;
+ tag_ = tag;
+ }
+
void Unref();
private:
+ bool has_tag_;
+ void* tag_;
grpc::mutex mu_;
int refs_;
bool finalized_;
@@ -90,18 +97,25 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
+ bool ret = false;
+ if (has_tag_) {
+ *tag = tag_;
+ ret = true;
+ }
if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;
}
- return false;
+ return ret;
}
// ServerContext body
ServerContext::ServerContext()
: completion_op_(nullptr),
+ has_notify_when_done_tag_(false),
+ async_notify_when_done_tag_(nullptr),
call_(nullptr),
cq_(nullptr),
sent_initial_metadata_(false) {}
@@ -109,6 +123,8 @@ ServerContext::ServerContext()
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count)
: completion_op_(nullptr),
+ has_notify_when_done_tag_(false),
+ async_notify_when_done_tag_(nullptr),
deadline_(deadline),
call_(nullptr),
cq_(nullptr),
@@ -133,6 +149,9 @@ ServerContext::~ServerContext() {
void ServerContext::BeginCompletionOp(Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
+ if (has_notify_when_done_tag_) {
+ completion_op_->set_tag(async_notify_when_done_tag_);
+ }
call->PerformOps(completion_op_);
}
diff --git a/src/csharp/Grpc.Auth/OAuth2Interceptors.cs b/src/csharp/Grpc.Auth/OAuth2Interceptors.cs
index c785ca5a16..cc9d2c175f 100644
--- a/src/csharp/Grpc.Auth/OAuth2Interceptors.cs
+++ b/src/csharp/Grpc.Auth/OAuth2Interceptors.cs
@@ -119,7 +119,5 @@ namespace Grpc.Auth
return new Metadata.Entry(AuthorizationHeader, Schema + " " + accessToken);
}
}
-
-
}
}
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
new file mode 100644
index 0000000000..60b45176e5
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
@@ -0,0 +1,91 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class ChannelTest
+ {
+ [TestFixtureTearDown]
+ public void CleanupClass()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public void Constructor_RejectsInvalidParams()
+ {
+ Assert.Throws(typeof(NullReferenceException), () => new Channel(null, Credentials.Insecure));
+ }
+
+ [Test]
+ public void State_IdleAfterCreation()
+ {
+ using (var channel = new Channel("localhost", Credentials.Insecure))
+ {
+ Assert.AreEqual(ChannelState.Idle, channel.State);
+ }
+ }
+
+ [Test]
+ public void WaitForStateChangedAsync_InvalidArgument()
+ {
+ using (var channel = new Channel("localhost", Credentials.Insecure))
+ {
+ Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
+ }
+ }
+
+ [Test]
+ public void Target()
+ {
+ using (var channel = new Channel("127.0.0.1", Credentials.Insecure))
+ {
+ Assert.IsTrue(channel.Target.Contains("127.0.0.1"));
+ }
+ }
+
+ [Test]
+ public void Dispose_IsIdempotent()
+ {
+ var channel = new Channel("localhost", Credentials.Insecure);
+ channel.Dispose();
+ channel.Dispose();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 540fe756c0..bf7cc3fbf3 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -77,11 +77,13 @@ namespace Grpc.Core.Tests
[SetUp]
public void Init()
{
- server = new Server();
- server.AddServiceDefinition(ServiceDefinition);
- int port = server.AddPort(Host, Server.PickUnusedPort, ServerCredentials.Insecure);
+ server = new Server
+ {
+ Services = { ServiceDefinition },
+ Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
+ };
server.Start();
- channel = new Channel(Host, port, Credentials.Insecure);
+ channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
}
[TearDown]
@@ -276,6 +278,30 @@ namespace Grpc.Core.Tests
Assert.IsTrue(peer.Contains(Host));
}
+ [Test]
+ public async Task Channel_WaitForStateChangedAsync()
+ {
+ Assert.Throws(typeof(TaskCanceledException),
+ async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(10)));
+
+ var stateChangedTask = channel.WaitForStateChangedAsync(channel.State);
+
+ var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
+ await Calls.AsyncUnaryCall(internalCall, "abc", CancellationToken.None);
+
+ await stateChangedTask;
+ Assert.AreEqual(ChannelState.Ready, channel.State);
+ }
+
+ [Test]
+ public async Task Channel_ConnectAsync()
+ {
+ await channel.ConnectAsync();
+ Assert.AreEqual(ChannelState.Ready, channel.State);
+ await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(1000));
+ Assert.AreEqual(ChannelState.Ready, channel.State);
+ }
+
private static async Task<string> EchoHandler(string request, ServerCallContext context)
{
foreach (Metadata.Entry metadataEntry in context.RequestHeaders)
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 242a60d098..f2bf459dc5 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -76,6 +76,7 @@
<Compile Include="Internal\TimespecTest.cs" />
<Compile Include="TimeoutsTest.cs" />
<Compile Include="NUnitVersionTest.cs" />
+ <Compile Include="ChannelTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/NUnitVersionTest.cs b/src/csharp/Grpc.Core.Tests/NUnitVersionTest.cs
index 600df1a18d..3fa6ad09c0 100644
--- a/src/csharp/Grpc.Core.Tests/NUnitVersionTest.cs
+++ b/src/csharp/Grpc.Core.Tests/NUnitVersionTest.cs
@@ -70,10 +70,8 @@ namespace Grpc.Core.Tests
[Test]
public async Task NUnitVersionTest2()
{
- testRunCount ++;
+ testRunCount++;
await Task.Delay(10);
}
-
-
}
}
diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs
index ba9efae871..485006ebac 100644
--- a/src/csharp/Grpc.Core.Tests/ServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Linq;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@@ -44,11 +45,45 @@ namespace Grpc.Core.Tests
[Test]
public void StartAndShutdownServer()
{
- Server server = new Server();
- server.AddPort("localhost", Server.PickUnusedPort, ServerCredentials.Insecure);
+ Server server = new Server
+ {
+ Ports = { new ServerPort("localhost", ServerPort.PickUnused, ServerCredentials.Insecure) }
+ };
server.Start();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
+
+ [Test]
+ public void PickUnusedPort()
+ {
+ Server server = new Server
+ {
+ Ports = { new ServerPort("localhost", ServerPort.PickUnused, ServerCredentials.Insecure) }
+ };
+
+ var boundPort = server.Ports.Single();
+ Assert.AreEqual(0, boundPort.Port);
+ Assert.Greater(boundPort.BoundPort, 0);
+
+ server.Start();
+ server.ShutdownAsync();
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public void CannotModifyAfterStarted()
+ {
+ Server server = new Server
+ {
+ Ports = { new ServerPort("localhost", ServerPort.PickUnused, ServerCredentials.Insecure) }
+ };
+ server.Start();
+ Assert.Throws(typeof(InvalidOperationException), () => server.Ports.Add("localhost", 9999, ServerCredentials.Insecure));
+ Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build()));
+
+ server.ShutdownAsync().Wait();
+ GrpcEnvironment.Shutdown();
+ }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
index 010ffd898a..d84801fbac 100644
--- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
+++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
@@ -70,11 +70,13 @@ namespace Grpc.Core.Tests
[SetUp]
public void Init()
{
- server = new Server();
- server.AddServiceDefinition(ServiceDefinition);
- int port = server.AddPort(Host, Server.PickUnusedPort, ServerCredentials.Insecure);
+ server = new Server
+ {
+ Services = { ServiceDefinition },
+ Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
+ };
server.Start();
- channel = new Channel(Host, port, Credentials.Insecure);
+ channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
stringFromServerHandlerTcs = new TaskCompletionSource<string>();
}
@@ -134,7 +136,8 @@ namespace Grpc.Core.Tests
}
catch (RpcException e)
{
- Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
}
@@ -151,7 +154,8 @@ namespace Grpc.Core.Tests
}
catch (RpcException e)
{
- Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
}
@@ -168,7 +172,8 @@ namespace Grpc.Core.Tests
}
catch (RpcException e)
{
- Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
+ // We can't guarantee the status code is always DeadlineExceeded. See issue #2685.
+ Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
Assert.AreEqual("CANCELLED", stringFromServerHandlerTcs.Task.Result);
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 18e6f2fda5..0b69610443 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -37,6 +37,8 @@ using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
+using Grpc.Core.Logging;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
@@ -45,21 +47,23 @@ namespace Grpc.Core
/// </summary>
public class Channel : IDisposable
{
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
+
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
- readonly string target;
bool disposed;
/// <summary>
/// Creates a channel that connects to a specific host.
- /// Port will default to 80 for an unsecure channel and to 443 a secure channel.
+ /// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
/// </summary>
- /// <param name="host">The DNS name of IP address of the host.</param>
+ /// <param name="host">The name or IP address of the host.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
public Channel(string host, Credentials credentials, IEnumerable<ChannelOption> options = null)
{
+ Preconditions.CheckNotNull(host);
this.environment = GrpcEnvironment.GetInstance();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
@@ -76,14 +80,13 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.CreateInsecure(host, nativeChannelArgs);
}
}
- this.target = GetOverridenTarget(host, this.options);
}
/// <summary>
/// Creates a channel that connects to a specific host and port.
/// </summary>
- /// <param name="host">DNS name or IP address</param>
- /// <param name="port">the port</param>
+ /// <param name="host">The name or IP address of the host.</param>
+ /// <param name="port">The port.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
public Channel(string host, int port, Credentials credentials, IEnumerable<ChannelOption> options = null) :
@@ -91,20 +94,82 @@ namespace Grpc.Core
{
}
- public void Dispose()
+ /// <summary>
+ /// Gets current connectivity state of this channel.
+ /// </summary>
+ public ChannelState State
{
- Dispose(true);
- GC.SuppressFinalize(this);
+ get
+ {
+ return handle.CheckConnectivityState(false);
+ }
+ }
+
+ /// <summary>
+ /// Returned tasks completes once channel state has become different from
+ /// given lastObservedState.
+ /// If deadline is reached or and error occurs, returned task is cancelled.
+ /// </summary>
+ public Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
+ {
+ Preconditions.CheckArgument(lastObservedState != ChannelState.FatalFailure,
+ "FatalFailure is a terminal state. No further state changes can occur.");
+ var tcs = new TaskCompletionSource<object>();
+ var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture;
+ var handler = new BatchCompletionDelegate((success, ctx) =>
+ {
+ if (success)
+ {
+ tcs.SetResult(null);
+ }
+ else
+ {
+ tcs.SetCanceled();
+ }
+ });
+ handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler);
+ return tcs.Task;
}
- internal string Target
+ /// <summary> Address of the remote endpoint in URI format.</summary>
+ public string Target
{
get
{
- return target;
+ return handle.GetTarget();
+ }
+ }
+
+ /// <summary>
+ /// Allows explicitly requesting channel to connect without starting an RPC.
+ /// Returned task completes once state Ready was seen. If the deadline is reached,
+ /// or channel enters the FatalFailure state, the task is cancelled.
+ /// There is no need to call this explicitly unless your use case requires that.
+ /// Starting an RPC on a new channel will request connection implicitly.
+ /// </summary>
+ public async Task ConnectAsync(DateTime? deadline = null)
+ {
+ var currentState = handle.CheckConnectivityState(true);
+ while (currentState != ChannelState.Ready)
+ {
+ if (currentState == ChannelState.FatalFailure)
+ {
+ throw new OperationCanceledException("Channel has reached FatalFailure state.");
+ }
+ await WaitForStateChangedAsync(currentState, deadline);
+ currentState = handle.CheckConnectivityState(false);
}
}
+ /// <summary>
+ /// Destroys the underlying channel.
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
internal ChannelSafeHandle Handle
{
get
@@ -159,26 +224,5 @@ namespace Grpc.Core
// TODO(jtattermusch): it would be useful to also provide .NET/mono version.
return string.Format("grpc-csharp/{0}", VersionInfo.CurrentVersion);
}
-
- /// <summary>
- /// Look for SslTargetNameOverride option and return its value instead of originalTarget
- /// if found.
- /// </summary>
- private static string GetOverridenTarget(string originalTarget, IEnumerable<ChannelOption> options)
- {
- if (options == null)
- {
- return originalTarget;
- }
- foreach (var option in options)
- {
- if (option.Type == ChannelOption.OptionType.String
- && option.Name == ChannelOptions.SslTargetNameOverride)
- {
- return option.StringValue;
- }
- }
- return originalTarget;
- }
}
}
diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs
index 9fe03d2805..1e0f90287a 100644
--- a/src/csharp/Grpc.Core/ChannelOptions.cs
+++ b/src/csharp/Grpc.Core/ChannelOptions.cs
@@ -30,7 +30,6 @@
#endregion
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -135,6 +134,9 @@ namespace Grpc.Core
/// <summary>Initial sequence number for http2 transports</summary>
public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number";
+ /// <summary>Default authority for calls.</summary>
+ public const string DefaultAuthority = "grpc.default_authority";
+
/// <summary>Primary user agent: goes at the start of the user-agent metadata</summary>
public const string PrimaryUserAgentString = "grpc.primary_user_agent";
diff --git a/src/csharp/Grpc.Core/ChannelState.cs b/src/csharp/Grpc.Core/ChannelState.cs
new file mode 100644
index 0000000000..d293b98f75
--- /dev/null
+++ b/src/csharp/Grpc.Core/ChannelState.cs
@@ -0,0 +1,69 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Connectivity state of a channel.
+ /// Based on grpc_connectivity_state from grpc/grpc.h
+ /// </summary>
+ public enum ChannelState
+ {
+ /// <summary>
+ /// Channel is idle
+ /// </summary>
+ Idle,
+
+ /// <summary>
+ /// Channel is connecting
+ /// </summary>
+ Connecting,
+
+ /// <summary>
+ /// Channel is ready for work
+ /// </summary>
+ Ready,
+
+ /// <summary>
+ /// Channel has seen a failure but expects to recover
+ /// </summary>
+ TransientFailure,
+
+ /// <summary>
+ /// Channel has seen a failure that it cannot recover from
+ /// </summary>
+ FatalFailure
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 940a6b8ac0..17add77164 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -44,9 +44,6 @@
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
- <Reference Include="System.Collections.Immutable">
- <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
- </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />
@@ -55,6 +52,7 @@
<Compile Include="IServerStreamWriter.cs" />
<Compile Include="IAsyncStreamWriter.cs" />
<Compile Include="IAsyncStreamReader.cs" />
+ <Compile Include="ServerPort.cs" />
<Compile Include="Version.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RpcException.cs" />
@@ -115,6 +113,7 @@
<Compile Include="Logging\ILogger.cs" />
<Compile Include="Logging\ConsoleLogger.cs" />
<Compile Include="Internal\NativeLogRedirector.cs" />
+ <Compile Include="ChannelState.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />
@@ -145,7 +144,5 @@
</Target>
<Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" />
<Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" />
- <ItemGroup>
- <Folder Include="Logging\" />
- </ItemGroup>
+ <ItemGroup />
</Project> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec
index 086776f69d..fe49efc7ec 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.nuspec
+++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec
@@ -15,7 +15,6 @@
<copyright>Copyright 2015, Google Inc.</copyright>
<tags>gRPC RPC Protocol HTTP/2</tags>
<dependencies>
- <dependency id="System.Collections.Immutable" version="1.1.36" />
<dependency id="Ix-Async" version="1.2.3" />
<dependency id="grpc.native.csharp_ext" version="$GrpcNativeCsharpExtVersion$" />
</dependencies>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index bfcb9366a1..48f466460f 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -67,7 +67,7 @@ namespace Grpc.Core.Internal
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline)
{
this.channel = channel;
- var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, channel.Target, deadline);
+ var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, null, deadline);
channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
}
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 20815efbd3..7324ebdf57 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -50,6 +50,16 @@ namespace Grpc.Core.Internal
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
+ static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_channel_watch_connectivity_state(ChannelSafeHandle channel, ChannelState lastObservedState,
+ Timespec deadline, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern CStringSafeHandle grpcsharp_channel_get_target(ChannelSafeHandle call);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_channel_destroy(IntPtr channel);
private ChannelSafeHandle()
@@ -73,6 +83,27 @@ namespace Grpc.Core.Internal
return result;
}
+ public ChannelState CheckConnectivityState(bool tryToConnect)
+ {
+ return grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
+ }
+
+ public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
+ CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
+ }
+
+ public string GetTarget()
+ {
+ using (var cstring = grpcsharp_channel_get_target(this))
+ {
+ return cstring.GetValue();
+ }
+ }
+
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
index 59238a452c..37a4f5256b 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
@@ -42,7 +42,7 @@ namespace Grpc.Core.Internal
internal class ServerCredentialsSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
- static extern ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs);
+ static extern ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_credentials_release(IntPtr credentials);
@@ -51,12 +51,13 @@ namespace Grpc.Core.Internal
{
}
- public static ServerCredentialsSafeHandle CreateSslCredentials(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray)
+ public static ServerCredentialsSafeHandle CreateSslCredentials(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, bool forceClientAuth)
{
Preconditions.CheckArgument(keyCertPairCertChainArray.Length == keyCertPairPrivateKeyArray.Length);
return grpcsharp_ssl_server_credentials_create(pemRootCerts,
keyCertPairCertChainArray, keyCertPairPrivateKeyArray,
- new UIntPtr((ulong)keyCertPairCertChainArray.Length));
+ new UIntPtr((ulong)keyCertPairCertChainArray.Length),
+ forceClientAuth);
}
protected override bool ReleaseHandle()
diff --git a/src/csharp/Grpc.Core/KeyCertificatePair.cs b/src/csharp/Grpc.Core/KeyCertificatePair.cs
index 7cea18618e..5def15a656 100644
--- a/src/csharp/Grpc.Core/KeyCertificatePair.cs
+++ b/src/csharp/Grpc.Core/KeyCertificatePair.cs
@@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index 2f308cbb11..6fd0a7109d 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -32,7 +32,6 @@
using System;
using System.Collections;
using System.Collections.Generic;
-using System.Collections.Immutable;
using System.Collections.Specialized;
using System.Runtime.InteropServices;
using System.Text;
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 3217547cc4..eb5b043d1c 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -32,7 +32,7 @@
#endregion
using System;
-using System.Collections.Concurrent;
+using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
@@ -48,18 +48,17 @@ namespace Grpc.Core
/// </summary>
public class Server
{
- /// <summary>
- /// Pass this value as port to have the server choose an unused listening port for you.
- /// </summary>
- public const int PickUnusedPort = 0;
-
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
+ readonly ServiceDefinitionCollection serviceDefinitions;
+ readonly ServerPortCollection ports;
readonly GrpcEnvironment environment;
readonly List<ChannelOption> options;
readonly ServerSafeHandle handle;
readonly object myLock = new object();
+ readonly List<ServerServiceDefinition> serviceDefinitionsList = new List<ServerServiceDefinition>();
+ readonly List<ServerPort> serverPortList = new List<ServerPort>();
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
@@ -72,6 +71,8 @@ namespace Grpc.Core
/// <param name="options">Channel options.</param>
public Server(IEnumerable<ChannelOption> options = null)
{
+ this.serviceDefinitions = new ServiceDefinitionCollection(this);
+ this.ports = new ServerPortCollection(this);
this.environment = GrpcEnvironment.GetInstance();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
@@ -81,47 +82,26 @@ namespace Grpc.Core
}
/// <summary>
- /// Adds a service definition to the server. This is how you register
- /// handlers for a service with the server.
- /// Only call this before Start().
+ /// Services that will be exported by the server once started. Register a service with this
+ /// server by adding its definition to this collection.
/// </summary>
- public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
+ public ServiceDefinitionCollection Services
{
- lock (myLock)
+ get
{
- Preconditions.CheckState(!startRequested);
- foreach (var entry in serviceDefinition.CallHandlers)
- {
- callHandlers.Add(entry.Key, entry.Value);
- }
+ return serviceDefinitions;
}
}
/// <summary>
- /// Add a port on which server should listen.
- /// Only call this before Start().
+ /// Ports on which the server will listen once started. Register a port with this
+ /// server by adding its definition to this collection.
/// </summary>
- /// <returns>The port on which server will be listening.</returns>
- /// <param name="host">the host</param>
- /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
- public int AddPort(string host, int port, ServerCredentials credentials)
+ public ServerPortCollection Ports
{
- lock (myLock)
+ get
{
- Preconditions.CheckNotNull(credentials);
- Preconditions.CheckState(!startRequested);
- var address = string.Format("{0}:{1}", host, port);
- using (var nativeCredentials = credentials.ToNativeCredentials())
- {
- if (nativeCredentials != null)
- {
- return handle.AddSecurePort(address, nativeCredentials);
- }
- else
- {
- return handle.AddInsecurePort(address);
- }
- }
+ return ports;
}
}
@@ -190,6 +170,50 @@ namespace Grpc.Core
}
/// <summary>
+ /// Adds a service definition.
+ /// </summary>
+ private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckState(!startRequested);
+ foreach (var entry in serviceDefinition.CallHandlers)
+ {
+ callHandlers.Add(entry.Key, entry.Value);
+ }
+ serviceDefinitionsList.Add(serviceDefinition);
+ }
+ }
+
+ /// <summary>
+ /// Adds a listening port.
+ /// </summary>
+ private int AddPortInternal(ServerPort serverPort)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(serverPort.Credentials);
+ Preconditions.CheckState(!startRequested);
+ var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port);
+ int boundPort;
+ using (var nativeCredentials = serverPort.Credentials.ToNativeCredentials())
+ {
+ if (nativeCredentials != null)
+ {
+ boundPort = handle.AddSecurePort(address, nativeCredentials);
+ }
+ else
+ {
+ boundPort = handle.AddInsecurePort(address);
+ }
+ }
+ var newServerPort = new ServerPort(serverPort, boundPort);
+ this.serverPortList.Add(newServerPort);
+ return boundPort;
+ }
+ }
+
+ /// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
private void AllowOneRpc()
@@ -249,5 +273,82 @@ namespace Grpc.Core
{
shutdownTcs.SetResult(null);
}
+
+ /// <summary>
+ /// Collection of service definitions.
+ /// </summary>
+ public class ServiceDefinitionCollection : IEnumerable<ServerServiceDefinition>
+ {
+ readonly Server server;
+
+ internal ServiceDefinitionCollection(Server server)
+ {
+ this.server = server;
+ }
+
+ /// <summary>
+ /// Adds a service definition to the server. This is how you register
+ /// handlers for a service with the server. Only call this before Start().
+ /// </summary>
+ public void Add(ServerServiceDefinition serviceDefinition)
+ {
+ server.AddServiceDefinitionInternal(serviceDefinition);
+ }
+
+ public IEnumerator<ServerServiceDefinition> GetEnumerator()
+ {
+ return server.serviceDefinitionsList.GetEnumerator();
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return server.serviceDefinitionsList.GetEnumerator();
+ }
+ }
+
+ /// <summary>
+ /// Collection of server ports.
+ /// </summary>
+ public class ServerPortCollection : IEnumerable<ServerPort>
+ {
+ readonly Server server;
+
+ internal ServerPortCollection(Server server)
+ {
+ this.server = server;
+ }
+
+ /// <summary>
+ /// Adds a new port on which server should listen.
+ /// Only call this before Start().
+ /// <returns>The port on which server will be listening.</returns>
+ /// </summary>
+ public int Add(ServerPort serverPort)
+ {
+ return server.AddPortInternal(serverPort);
+ }
+
+ /// <summary>
+ /// Adds a new port on which server should listen.
+ /// <returns>The port on which server will be listening.</returns>
+ /// </summary>
+ /// <param name="host">the host</param>
+ /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
+ /// <param name="credentials">credentials to use to secure this port.</param>
+ public int Add(string host, int port, ServerCredentials credentials)
+ {
+ return Add(new ServerPort(host, port, credentials));
+ }
+
+ public IEnumerator<ServerPort> GetEnumerator()
+ {
+ return server.serverPortList.GetEnumerator();
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return server.serverPortList.GetEnumerator();
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/ServerCredentials.cs b/src/csharp/Grpc.Core/ServerCredentials.cs
index 32ed4b78a1..c11a1ede08 100644
--- a/src/csharp/Grpc.Core/ServerCredentials.cs
+++ b/src/csharp/Grpc.Core/ServerCredentials.cs
@@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@@ -80,18 +79,26 @@ namespace Grpc.Core
{
readonly IList<KeyCertificatePair> keyCertificatePairs;
readonly string rootCertificates;
+ readonly bool forceClientAuth;
/// <summary>
/// Creates server-side SSL credentials.
/// </summary>
- /// <param name="rootCertificates">PEM encoded client root certificates used to authenticate client.</param>
/// <param name="keyCertificatePairs">Key-certificates to use.</param>
- public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs, string rootCertificates)
+ /// <param name="rootCertificates">PEM encoded client root certificates used to authenticate client.</param>
+ /// <param name="forceClientAuth">If true, client will be rejected unless it proves its unthenticity using against rootCertificates.</param>
+ public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs, string rootCertificates, bool forceClientAuth)
{
this.keyCertificatePairs = new List<KeyCertificatePair>(keyCertificatePairs).AsReadOnly();
Preconditions.CheckArgument(this.keyCertificatePairs.Count > 0,
"At least one KeyCertificatePair needs to be provided");
+ if (forceClientAuth)
+ {
+ Preconditions.CheckNotNull(rootCertificates,
+ "Cannot force client authentication unless you provide rootCertificates.");
+ }
this.rootCertificates = rootCertificates;
+ this.forceClientAuth = forceClientAuth;
}
/// <summary>
@@ -100,7 +107,7 @@ namespace Grpc.Core
/// using client root certificates.
/// </summary>
/// <param name="keyCertificatePairs">Key-certificates to use.</param>
- public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs) : this(keyCertificatePairs, null)
+ public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs) : this(keyCertificatePairs, null, false)
{
}
@@ -126,6 +133,17 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// If true, the authenticity of client check will be enforced.
+ /// </summary>
+ public bool ForceClientAuthentication
+ {
+ get
+ {
+ return this.forceClientAuth;
+ }
+ }
+
internal override ServerCredentialsSafeHandle ToNativeCredentials()
{
int count = keyCertificatePairs.Count;
@@ -136,7 +154,7 @@ namespace Grpc.Core
certChains[i] = keyCertificatePairs[i].CertificateChain;
keys[i] = keyCertificatePairs[i].PrivateKey;
}
- return ServerCredentialsSafeHandle.CreateSslCredentials(rootCertificates, certChains, keys);
+ return ServerCredentialsSafeHandle.CreateSslCredentials(rootCertificates, certChains, keys, forceClientAuth);
}
}
}
diff --git a/src/csharp/Grpc.Core/ServerPort.cs b/src/csharp/Grpc.Core/ServerPort.cs
new file mode 100644
index 0000000000..55e4bd0062
--- /dev/null
+++ b/src/csharp/Grpc.Core/ServerPort.cs
@@ -0,0 +1,120 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+using Grpc.Core.Utils;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// A port exposed by a server.
+ /// </summary>
+ public class ServerPort
+ {
+ /// <summary>
+ /// Pass this value as port to have the server choose an unused listening port for you.
+ /// Ports added to a server will contain the bound port in their <see cref="BoundPort"/> property.
+ /// </summary>
+ public const int PickUnused = 0;
+
+ readonly string host;
+ readonly int port;
+ readonly ServerCredentials credentials;
+ readonly int boundPort;
+
+ /// <summary>
+ /// Creates a new port on which server should listen.
+ /// </summary>
+ /// <returns>The port on which server will be listening.</returns>
+ /// <param name="host">the host</param>
+ /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
+ /// <param name="credentials">credentials to use to secure this port.</param>
+ public ServerPort(string host, int port, ServerCredentials credentials)
+ {
+ this.host = Preconditions.CheckNotNull(host);
+ this.port = port;
+ this.credentials = Preconditions.CheckNotNull(credentials);
+ }
+
+ /// <summary>
+ /// Creates a port from an existing <c>ServerPort</c> instance and boundPort value.
+ /// </summary>
+ internal ServerPort(ServerPort serverPort, int boundPort)
+ {
+ this.host = serverPort.host;
+ this.port = serverPort.port;
+ this.credentials = serverPort.credentials;
+ this.boundPort = boundPort;
+ }
+
+ /// <value>The host.</value>
+ public string Host
+ {
+ get
+ {
+ return host;
+ }
+ }
+
+ /// <value>The port.</value>
+ public int Port
+ {
+ get
+ {
+ return port;
+ }
+ }
+
+ /// <value>The server credentials.</value>
+ public ServerCredentials Credentials
+ {
+ get
+ {
+ return credentials;
+ }
+ }
+
+ /// <value>
+ /// The port actually bound by the server. This is useful if you let server
+ /// pick port automatically. <see cref="PickUnused"/>
+ /// </value>
+ public int BoundPort
+ {
+ get
+ {
+ return boundPort;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
index b180186c12..a00d156e52 100644
--- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs
+++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
@@ -33,7 +33,7 @@
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
+using System.Collections.ObjectModel;
using Grpc.Core.Internal;
namespace Grpc.Core
@@ -43,14 +43,14 @@ namespace Grpc.Core
/// </summary>
public class ServerServiceDefinition
{
- readonly ImmutableDictionary<string, IServerCallHandler> callHandlers;
+ readonly ReadOnlyDictionary<string, IServerCallHandler> callHandlers;
- private ServerServiceDefinition(ImmutableDictionary<string, IServerCallHandler> callHandlers)
+ private ServerServiceDefinition(Dictionary<string, IServerCallHandler> callHandlers)
{
- this.callHandlers = callHandlers;
+ this.callHandlers = new ReadOnlyDictionary<string, IServerCallHandler>(callHandlers);
}
- internal ImmutableDictionary<string, IServerCallHandler> CallHandlers
+ internal IDictionary<string, IServerCallHandler> CallHandlers
{
get
{
@@ -115,7 +115,7 @@ namespace Grpc.Core
public ServerServiceDefinition Build()
{
- return new ServerServiceDefinition(callHandlers.ToImmutableDictionary());
+ return new ServerServiceDefinition(callHandlers);
}
}
}
diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config
index 6cdcdf2656..9b12b9b096 100644
--- a/src/csharp/Grpc.Core/packages.config
+++ b/src/csharp/Grpc.Core/packages.config
@@ -3,5 +3,4 @@
<package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" />
<package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
- <package id="System.Collections.Immutable" version="1.1.36" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
index 468eefbe3e..5f7e717b0c 100644
--- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs
+++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
@@ -38,16 +38,19 @@ namespace math
{
class MainClass
{
+ const string Host = "0.0.0.0";
+ const int Port = 23456;
+
public static void Main(string[] args)
{
- string host = "0.0.0.0";
-
- Server server = new Server();
- server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
- int port = server.AddPort(host, 23456, ServerCredentials.Insecure);
+ Server server = new Server
+ {
+ Services = { Math.BindService(new MathServiceImpl()) },
+ Ports = { { Host, Port, ServerCredentials.Insecure } }
+ };
server.Start();
- Console.WriteLine("MathServer listening on port " + port);
+ Console.WriteLine("MathServer listening on port " + Port);
Console.WriteLine("Press any key to stop the server...");
Console.ReadKey();
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 242d29a9a5..08aece7ef2 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
@@ -46,7 +47,7 @@ namespace math.Tests
/// </summary>
public class MathClientServerTest
{
- string host = "localhost";
+ const string Host = "localhost";
Server server;
Channel channel;
Math.MathClient client;
@@ -54,19 +55,14 @@ namespace math.Tests
[TestFixtureSetUp]
public void Init()
{
- server = new Server();
- server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
- int port = server.AddPort(host, Server.PickUnusedPort, ServerCredentials.Insecure);
- server.Start();
- channel = new Channel(host, port, Credentials.Insecure);
- client = Math.NewClient(channel);
-
- // TODO(jtattermusch): get rid of the custom header here once we have dedicated tests
- // for header support.
- client.HeaderInterceptor = (metadata) =>
+ server = new Server
{
- metadata.Add(new Metadata.Entry("custom-header", "abcdef"));
+ Services = { Math.BindService(new MathServiceImpl()) },
+ Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
};
+ server.Start();
+ channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
+ client = Math.NewClient(channel);
}
[TestFixtureTearDown]
diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
index 9d89698a8f..024377e216 100644
--- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
+++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
@@ -57,11 +57,13 @@ namespace Grpc.HealthCheck.Tests
{
serviceImpl = new HealthServiceImpl();
- server = new Server();
- server.AddServiceDefinition(Grpc.Health.V1Alpha.Health.BindService(serviceImpl));
- int port = server.AddPort(Host, Server.PickUnusedPort, ServerCredentials.Insecure);
+ server = new Server
+ {
+ Services = { Grpc.Health.V1Alpha.Health.BindService(serviceImpl) },
+ Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
+ };
server.Start();
- channel = new Channel(Host, port, Credentials.Insecure);
+ channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
client = Grpc.Health.V1Alpha.Health.NewClient(channel);
}
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index abc27f811e..06a75a3351 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -87,9 +87,6 @@
<Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll</HintPath>
</Reference>
- <Reference Include="System.Collections.Immutable">
- <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
- </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs">
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index 2756ce97aa..6fa721bc1c 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using grpc.testing;
@@ -47,7 +48,7 @@ namespace Grpc.IntegrationTesting
/// </summary>
public class InteropClientServerTest
{
- string host = "localhost";
+ const string Host = "localhost";
Server server;
Channel channel;
TestService.ITestServiceClient client;
@@ -55,16 +56,19 @@ namespace Grpc.IntegrationTesting
[TestFixtureSetUp]
public void Init()
{
- server = new Server();
- server.AddServiceDefinition(TestService.BindService(new TestServiceImpl()));
- int port = server.AddPort(host, Server.PickUnusedPort, TestCredentials.CreateTestServerCredentials());
+ server = new Server
+ {
+ Services = { TestService.BindService(new TestServiceImpl()) },
+ Ports = { { Host, ServerPort.PickUnused, TestCredentials.CreateTestServerCredentials() } }
+ };
server.Start();
var options = new List<ChannelOption>
{
new ChannelOption(ChannelOptions.SslTargetNameOverride, TestCredentials.DefaultHostOverride)
};
- channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), options);
+ int port = server.Ports.Single().BoundPort;
+ channel = new Channel(Host, port, TestCredentials.CreateTestClientCredentials(true), options);
client = TestService.NewClient(channel);
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
index bf6947e09d..504fd11857 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
@@ -88,18 +88,20 @@ namespace Grpc.IntegrationTesting
private void Run()
{
- var server = new Server();
- server.AddServiceDefinition(TestService.BindService(new TestServiceImpl()));
+ var server = new Server
+ {
+ Services = { TestService.BindService(new TestServiceImpl()) }
+ };
string host = "0.0.0.0";
int port = options.port.Value;
if (options.useTls)
{
- server.AddPort(host, port, TestCredentials.CreateTestServerCredentials());
+ server.Ports.Add(host, port, TestCredentials.CreateTestServerCredentials());
}
else
{
- server.AddPort(host, options.port.Value, ServerCredentials.Insecure);
+ server.Ports.Add(host, options.port.Value, ServerCredentials.Insecure);
}
Console.WriteLine("Running server on " + string.Format("{0}:{1}", host, port));
server.Start();
diff --git a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
index 1baf40eea2..1c398eb84e 100644
--- a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
@@ -34,6 +34,7 @@
using System;
using System.Collections.Generic;
using System.IO;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using grpc.testing;
@@ -49,7 +50,7 @@ namespace Grpc.IntegrationTesting
/// </summary>
public class SslCredentialsTest
{
- string host = "localhost";
+ const string Host = "localhost";
Server server;
Channel channel;
TestService.ITestServiceClient client;
@@ -62,12 +63,14 @@ namespace Grpc.IntegrationTesting
File.ReadAllText(TestCredentials.ServerCertChainPath),
File.ReadAllText(TestCredentials.ServerPrivateKeyPath));
- var serverCredentials = new SslServerCredentials(new[] { keyCertPair }, rootCert);
+ var serverCredentials = new SslServerCredentials(new[] { keyCertPair }, rootCert, true);
var clientCredentials = new SslCredentials(rootCert, keyCertPair);
- server = new Server();
- server.AddServiceDefinition(TestService.BindService(new TestServiceImpl()));
- int port = server.AddPort(host, Server.PickUnusedPort, serverCredentials);
+ server = new Server
+ {
+ Services = { TestService.BindService(new TestServiceImpl()) },
+ Ports = { { Host, ServerPort.PickUnused, serverCredentials } }
+ };
server.Start();
var options = new List<ChannelOption>
@@ -75,7 +78,7 @@ namespace Grpc.IntegrationTesting
new ChannelOption(ChannelOptions.SslTargetNameOverride, TestCredentials.DefaultHostOverride)
};
- channel = new Channel(host, port, clientCredentials, options);
+ channel = new Channel(Host, server.Ports.Single().BoundPort, clientCredentials, options);
client = TestService.NewClient(channel);
}
diff --git a/src/csharp/Grpc.IntegrationTesting/TestCredentials.cs b/src/csharp/Grpc.IntegrationTesting/TestCredentials.cs
index 54d8587713..da0b7fb910 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestCredentials.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestCredentials.cs
@@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
using System.Diagnostics;
using System.IO;
using System.Text.RegularExpressions;
diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config
index 746133a7a5..7d1f84f303 100644
--- a/src/csharp/Grpc.IntegrationTesting/packages.config
+++ b/src/csharp/Grpc.IntegrationTesting/packages.config
@@ -11,5 +11,4 @@
<package id="Microsoft.Net.Http" version="2.2.29" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
- <package id="System.Collections.Immutable" version="1.1.36" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 49a0471042..37864a62a5 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -382,6 +382,22 @@ grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq,
return grpc_channel_create_call(channel, cq, method, host, deadline);
}
+GPR_EXPORT grpc_connectivity_state GPR_CALLTYPE
+grpcsharp_channel_check_connectivity_state(grpc_channel *channel, gpr_int32 try_to_connect) {
+ return grpc_channel_check_connectivity_state(channel, try_to_connect);
+}
+
+GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_watch_connectivity_state(
+ grpc_channel *channel, grpc_connectivity_state last_observed_state,
+ gpr_timespec deadline, grpc_completion_queue *cq, grpcsharp_batch_context *ctx) {
+ grpc_channel_watch_connectivity_state(channel, last_observed_state,
+ deadline, cq, ctx);
+}
+
+GPR_EXPORT char *GPR_CALLTYPE grpcsharp_channel_get_target(grpc_channel *channel) {
+ return grpc_channel_get_target(channel);
+}
+
/* Channel args */
GPR_EXPORT grpc_channel_args *GPR_CALLTYPE
@@ -715,7 +731,7 @@ grpcsharp_server_create(grpc_completion_queue *cq,
GPR_EXPORT gpr_int32 GPR_CALLTYPE
grpcsharp_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
- return grpc_server_add_http2_port(server, addr);
+ return grpc_server_add_insecure_http2_port(server, addr);
}
GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) {
@@ -776,7 +792,8 @@ grpcsharp_secure_channel_create(grpc_credentials *creds, const char *target,
GPR_EXPORT grpc_server_credentials *GPR_CALLTYPE
grpcsharp_ssl_server_credentials_create(
const char *pem_root_certs, const char **key_cert_pair_cert_chain_array,
- const char **key_cert_pair_private_key_array, size_t num_key_cert_pairs) {
+ const char **key_cert_pair_private_key_array, size_t num_key_cert_pairs,
+ int force_client_auth) {
size_t i;
grpc_server_credentials *creds;
grpc_ssl_pem_key_cert_pair *key_cert_pairs =
@@ -791,9 +808,9 @@ grpcsharp_ssl_server_credentials_create(
key_cert_pairs[i].private_key = key_cert_pair_private_key_array[i];
}
}
- /* TODO: Add a force_client_auth parameter and pass it here. */
creds = grpc_ssl_server_credentials_create(pem_root_certs, key_cert_pairs,
- num_key_cert_pairs, 0);
+ num_key_cert_pairs,
+ force_client_auth);
gpr_free(key_cert_pairs);
return creds;
}
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index 04fabc871d..1dc179db3d 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -265,8 +265,8 @@ NAN_METHOD(Server::AddHttp2Port) {
grpc_server_credentials *creds = creds_object->GetWrappedServerCredentials();
int port;
if (creds == NULL) {
- port = grpc_server_add_http2_port(server->wrapped_server,
- *NanUtf8String(args[0]));
+ port = grpc_server_add_insecure_http2_port(server->wrapped_server,
+ *NanUtf8String(args[0]));
} else {
port = grpc_server_add_secure_http2_port(server->wrapped_server,
*NanUtf8String(args[0]),
diff --git a/src/objective-c/generated_libraries/RemoteTestClient/RemoteTest.podspec b/src/objective-c/generated_libraries/RemoteTestClient/RemoteTest.podspec
index 6b00efebb8..8710753e59 100644
--- a/src/objective-c/generated_libraries/RemoteTestClient/RemoteTest.podspec
+++ b/src/objective-c/generated_libraries/RemoteTestClient/RemoteTest.podspec
@@ -8,7 +8,10 @@ Pod::Spec.new do |s|
# Run protoc with the Objective-C and gRPC plugins to generate protocol messages and gRPC clients.
s.prepare_command = <<-CMD
- protoc --plugin=protoc-gen-grpc=../../../../bins/$CONFIG/grpc_objective_c_plugin --objc_out=. --grpc_out=. *.proto
+ BINDIR=../../../../bins/$CONFIG
+ PROTOC=$BINDIR/protobuf/protoc
+ PLUGIN=$BINDIR/grpc_objective_c_plugin
+ $PROTOC --plugin=protoc-gen-grpc=$PLUGIN --objc_out=. --grpc_out=. *.proto
CMD
s.subspec "Messages" do |ms|
diff --git a/src/objective-c/generated_libraries/RouteGuideClient/RouteGuide.podspec b/src/objective-c/generated_libraries/RouteGuideClient/RouteGuide.podspec
index 2c9cead7cf..23ccffe69d 100644
--- a/src/objective-c/generated_libraries/RouteGuideClient/RouteGuide.podspec
+++ b/src/objective-c/generated_libraries/RouteGuideClient/RouteGuide.podspec
@@ -8,7 +8,10 @@ Pod::Spec.new do |s|
# Run protoc with the Objective-C and gRPC plugins to generate protocol messages and gRPC clients.
s.prepare_command = <<-CMD
- protoc --plugin=protoc-gen-grpc=../../../../bins/$CONFIG/grpc_objective_c_plugin --objc_out=. --grpc_out=. *.proto
+ BINDIR=../../../../bins/$CONFIG
+ PROTOC=$BINDIR/protobuf/protoc
+ PLUGIN=$BINDIR/grpc_objective_c_plugin
+ $PROTOC --plugin=protoc-gen-grpc=$PLUGIN --objc_out=. --grpc_out=. *.proto
CMD
s.subspec "Messages" do |ms|
diff --git a/src/objective-c/tests/build_tests.sh b/src/objective-c/tests/build_tests.sh
index d98e0a769c..e7ad31e403 100755
--- a/src/objective-c/tests/build_tests.sh
+++ b/src/objective-c/tests/build_tests.sh
@@ -28,12 +28,39 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+# Don't run this script standalone. Instead, run from the repository root:
+# ./tools/run_tests/run_tests.py -l objc
+
set -e
cd $(dirname $0)
-# The local test server needs to be compiled before this because pod install of
-# gRPC renames some C gRPC files and not the server's code references to them.
-#
-# Suppress error output because Cocoapods issue #3823 causes a flooding warning.
-pod install 2>/dev/null
+hash pod 2>/dev/null || { echo >&2 "Cocoapods needs to be installed."; exit 1; }
+hash xcodebuild 2>/dev/null || {
+ echo >&2 "XCode command-line tools need to be installed."
+ exit 1
+}
+
+BINDIR=../../../bins/$CONFIG
+
+if [ ! -f $BINDIR/protobuf/protoc ]; then
+ hash protoc 2>/dev/null || {
+ echo >&2 "Can't find protoc. Make sure run_tests.py is making" \
+ "grpc_objective_c_plugin before calling this script."
+ exit 1
+ }
+ # When protoc is already installed, make doesn't compile one. Put a link
+ # there so the podspecs can do codegen using that path.
+ mkdir -p $BINDIR/protobuf
+ ln -s `which protoc` $BINDIR/protobuf/protoc
+fi
+
+[ -f $BINDIR/interop_server ] || {
+ echo >&2 "Can't find the test server. Make sure run_tests.py is making" \
+ "interop_server before calling this script. It needs to be done" \
+ "before because pod install of gRPC renames some C gRPC files" \
+ "and not the server's code references to them."
+ exit 1
+}
+
+pod install
diff --git a/src/objective-c/tests/run_tests.sh b/src/objective-c/tests/run_tests.sh
index 9afec687d6..b13c0f0633 100755
--- a/src/objective-c/tests/run_tests.sh
+++ b/src/objective-c/tests/run_tests.sh
@@ -28,6 +28,9 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+# Don't run this script standalone. Instead, run from the repository root:
+# ./tools/run_tests/run_tests.py -l objc
+
set -e
cd $(dirname $0)
diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c
index 8b8d5b2f47..d58aa884ca 100644
--- a/src/php/ext/grpc/server.c
+++ b/src/php/ext/grpc/server.c
@@ -182,7 +182,7 @@ PHP_METHOD(Server, addHttp2Port) {
"add_http2_port expects a string", 1 TSRMLS_CC);
return;
}
- RETURN_LONG(grpc_server_add_http2_port(server->wrapped, addr));
+ RETURN_LONG(grpc_server_add_insecure_http2_port(server->wrapped, addr));
}
PHP_METHOD(Server, addSecureHttp2Port) {
diff --git a/src/python/grpcio/.gitignore b/src/python/grpcio/.gitignore
index efbe1737ba..4c02b8d14d 100644
--- a/src/python/grpcio/.gitignore
+++ b/src/python/grpcio/.gitignore
@@ -6,3 +6,4 @@ dist/
*.egg/
*.eggs/
doc/
+_grpcio_metadata.py
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index 605d9d5612..89c0fbf0f3 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -34,6 +34,7 @@ import os.path
import sys
import setuptools
+from setuptools.command import build_py
_CONF_PY_ADDENDUM = """
extensions.append('sphinx.ext.napoleon')
@@ -74,3 +75,28 @@ class SphinxDocumentation(setuptools.Command):
conf_file.write(_CONF_PY_ADDENDUM)
sphinx.main(['', os.path.join('doc', 'src'), os.path.join('doc', 'build')])
+
+class BuildProjectMetadata(setuptools.Command):
+ """Command to generate project metadata in a module."""
+
+ description = ''
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ def run(self):
+ with open('grpc/_grpcio_metadata.py', 'w') as module_file:
+ module_file.write('__version__ = """{}"""'.format(
+ self.distribution.get_version()))
+
+
+class BuildPy(build_py.build_py):
+ """Custom project build command."""
+
+ def run(self):
+ self.run_command('build_project_metadata')
+ build_py.build_py.run(self)
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/server.c b/src/python/grpcio/grpc/_adapter/_c/types/server.c
index 2a00f34039..c2190ea672 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/server.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/server.c
@@ -155,7 +155,7 @@ PyObject *pygrpc_Server_add_http2_port(
port = grpc_server_add_secure_http2_port(
self->c_serv, addr, creds->c_creds);
} else {
- port = grpc_server_add_http2_port(self->c_serv, addr);
+ port = grpc_server_add_insecure_http2_port(self->c_serv, addr);
}
return PyInt_FromLong(port);
diff --git a/src/python/grpcio/setup.py b/src/python/grpcio/setup.py
index e408f2ace9..caa71a4f7c 100644
--- a/src/python/grpcio/setup.py
+++ b/src/python/grpcio/setup.py
@@ -98,6 +98,8 @@ _SETUP_REQUIRES = (
_COMMAND_CLASS = {
'doc': commands.SphinxDocumentation,
+ 'build_project_metadata': commands.BuildProjectMetadata,
+ 'build_py': commands.BuildPy,
}
setuptools.setup(
diff --git a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
index 0531fa1d33..9cdc9620f0 100644
--- a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
+++ b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
@@ -35,6 +35,7 @@ from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
from grpc.framework.interfaces.links import links
+from grpc_test import test_common
from grpc_test._links import _proto_scenarios
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.links import test_cases
@@ -94,12 +95,11 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
return _intermediary_low.Code.OK, 'An exuberant test "details" message!'
def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
- # we need to filter out any additional metadata added in transmitted_metadata
- # since implementations are allowed to add to what is sent (in any position)
- keys, _ = zip(*original_metadata)
- self.assertSequenceEqual(
- original_metadata,
- [x for x in transmitted_metadata if x[0] in keys])
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ original_metadata, transmitted_metadata),
+ '%s erroneously transmitted as %s' % (
+ original_metadata, transmitted_metadata))
class RoundTripTest(unittest.TestCase):
diff --git a/src/python/grpcio_test/grpc_test/test_common.py b/src/python/grpcio_test/grpc_test/test_common.py
new file mode 100644
index 0000000000..f8e1f1e43f
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/test_common.py
@@ -0,0 +1,71 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Common code used throughout tests of gRPC."""
+
+import collections
+
+
+def metadata_transmitted(original_metadata, transmitted_metadata):
+ """Judges whether or not metadata was acceptably transmitted.
+
+ gRPC is allowed to insert key-value pairs into the metadata values given by
+ applications and to reorder key-value pairs with different keys but it is not
+ allowed to alter existing key-value pairs or to reorder key-value pairs with
+ the same key.
+
+ Args:
+ original_metadata: A metadata value used in a test of gRPC.
+ transmitted_metadata: A metadata value corresponding to original_metadata
+ after having been transmitted via gRPC.
+
+ Returns:
+ A boolean indicating whether transmitted_metadata accurately reflects
+ original_metadata after having been transmitted via gRPC.
+ """
+ original = collections.defaultdict(list)
+ for key, value in original_metadata:
+ original[key].append(value)
+ transmitted = collections.defaultdict(list)
+ for key, value in transmitted_metadata:
+ transmitted[key].append(value)
+
+ for key, values in original.iteritems():
+ transmitted_values = transmitted[key]
+ transmitted_iterator = iter(transmitted_values)
+ try:
+ for value in values:
+ while True:
+ transmitted_value = next(transmitted_iterator)
+ if value == transmitted_value:
+ break
+ except StopIteration:
+ return False
+ else:
+ return True
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 375a651d24..79a4ae8757 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -357,7 +357,8 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
rb_raise(rb_eRuntimeError, "destroyed!");
return Qnil;
} else if (rb_creds == Qnil) {
- recvd_port = grpc_server_add_http2_port(s->wrapped, StringValueCStr(port));
+ recvd_port =
+ grpc_server_add_insecure_http2_port(s->wrapped, StringValueCStr(port));
if (recvd_port == 0) {
rb_raise(rb_eRuntimeError,
"could not add port %s to server, not sure why",