aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Adele Zhou <adelez@google.com>2017-08-10 15:47:09 -0700
committerGravatar Adele Zhou <adelez@google.com>2017-08-10 15:47:09 -0700
commitf2bd22131a8c517c4ca399cbb39cb788fcdc52b9 (patch)
tree5c8ff699280e1e5963d9a83833ea302b1ff396b1
parentc210147eabc03e6c4e1fa3cb8a7b8f44dd1cda9d (diff)
parent5020562df2f7e96705c5d3550e84e76437104cdd (diff)
Merge branch 'master' of github.com:grpc/grpc
-rw-r--r--BUILD26
-rw-r--r--CMakeLists.txt4
-rw-r--r--Makefile4
-rw-r--r--binding.gyp2
-rw-r--r--build.yaml21
-rw-r--r--config.m42
-rw-r--r--config.w322
-rw-r--r--gRPC-Core.podspec6
-rw-r--r--grpc.gemspec4
-rw-r--r--package.xml4
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c398
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h6
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c12
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.h2
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs74
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs15
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs19
-rw-r--r--src/csharp/Grpc.Core/RpcException.cs28
-rw-r--r--src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs21
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
-rw-r--r--test/core/security/credentials_test.c7
-rw-r--r--test/cpp/qps/server_async.cc3
-rw-r--r--tools/internal_ci/windows/grpc_portability_build_only.cfg (renamed from tools/internal_ci/windows/grpc_portability_master.cfg)2
-rw-r--r--tools/run_tests/generated/sources_and_headers.json30
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj6
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters12
28 files changed, 563 insertions, 174 deletions
diff --git a/BUILD b/BUILD
index 19d019bc51..49c340d070 100644
--- a/BUILD
+++ b/BUILD
@@ -1412,13 +1412,29 @@ grpc_cc_library(
)
grpc_cc_library(
+ name = "tsi_interface",
+ srcs = [
+ "src/core/tsi/transport_security.c",
+ "src/core/tsi/transport_security_adapter.c",
+ ],
+ hdrs = [
+ "src/core/tsi/transport_security.h",
+ "src/core/tsi/transport_security_adapter.h",
+ "src/core/tsi/transport_security_interface.h",
+ ],
+ language = "c",
+ deps = [
+ "gpr",
+ "grpc_trace",
+ ],
+)
+
+grpc_cc_library(
name = "tsi",
srcs = [
"src/core/tsi/fake_transport_security.c",
"src/core/tsi/gts_transport_security.c",
"src/core/tsi/ssl_transport_security.c",
- "src/core/tsi/transport_security.c",
- "src/core/tsi/transport_security_adapter.c",
"src/core/tsi/transport_security_grpc.c",
],
hdrs = [
@@ -1426,19 +1442,15 @@ grpc_cc_library(
"src/core/tsi/gts_transport_security.h",
"src/core/tsi/ssl_transport_security.h",
"src/core/tsi/ssl_types.h",
- "src/core/tsi/transport_security.h",
- "src/core/tsi/transport_security_adapter.h",
"src/core/tsi/transport_security_grpc.h",
- "src/core/tsi/transport_security_interface.h",
],
external_deps = [
"libssl",
],
language = "c",
deps = [
- "gpr",
"grpc_base",
- "grpc_trace",
+ "tsi_interface",
],
)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 128f44c277..8dc4758d23 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1132,9 +1132,9 @@ add_library(grpc
src/core/tsi/fake_transport_security.c
src/core/tsi/gts_transport_security.c
src/core/tsi/ssl_transport_security.c
+ src/core/tsi/transport_security_grpc.c
src/core/tsi/transport_security.c
src/core/tsi/transport_security_adapter.c
- src/core/tsi/transport_security_grpc.c
src/core/ext/transport/chttp2/server/chttp2_server.c
src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
src/core/ext/filters/client_channel/channel_connectivity.c
@@ -1504,9 +1504,9 @@ add_library(grpc_cronet
src/core/tsi/fake_transport_security.c
src/core/tsi/gts_transport_security.c
src/core/tsi/ssl_transport_security.c
+ src/core/tsi/transport_security_grpc.c
src/core/tsi/transport_security.c
src/core/tsi/transport_security_adapter.c
- src/core/tsi/transport_security_grpc.c
src/core/ext/transport/chttp2/client/chttp2_connector.c
src/core/ext/filters/load_reporting/load_reporting.c
src/core/ext/filters/load_reporting/load_reporting_filter.c
diff --git a/Makefile b/Makefile
index 0d5dad2a4b..74f05f592d 100644
--- a/Makefile
+++ b/Makefile
@@ -3079,9 +3079,9 @@ LIBGRPC_SRC = \
src/core/tsi/fake_transport_security.c \
src/core/tsi/gts_transport_security.c \
src/core/tsi/ssl_transport_security.c \
+ src/core/tsi/transport_security_grpc.c \
src/core/tsi/transport_security.c \
src/core/tsi/transport_security_adapter.c \
- src/core/tsi/transport_security_grpc.c \
src/core/ext/transport/chttp2/server/chttp2_server.c \
src/core/ext/transport/chttp2/client/secure/secure_channel_create.c \
src/core/ext/filters/client_channel/channel_connectivity.c \
@@ -3449,9 +3449,9 @@ LIBGRPC_CRONET_SRC = \
src/core/tsi/fake_transport_security.c \
src/core/tsi/gts_transport_security.c \
src/core/tsi/ssl_transport_security.c \
+ src/core/tsi/transport_security_grpc.c \
src/core/tsi/transport_security.c \
src/core/tsi/transport_security_adapter.c \
- src/core/tsi/transport_security_grpc.c \
src/core/ext/transport/chttp2/client/chttp2_connector.c \
src/core/ext/filters/load_reporting/load_reporting.c \
src/core/ext/filters/load_reporting/load_reporting_filter.c \
diff --git a/binding.gyp b/binding.gyp
index 226e745844..bbefd05c20 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -841,9 +841,9 @@
'src/core/tsi/fake_transport_security.c',
'src/core/tsi/gts_transport_security.c',
'src/core/tsi/ssl_transport_security.c',
+ 'src/core/tsi/transport_security_grpc.c',
'src/core/tsi/transport_security.c',
'src/core/tsi/transport_security_adapter.c',
- 'src/core/tsi/transport_security_grpc.c',
'src/core/ext/transport/chttp2/server/chttp2_server.c',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c',
'src/core/ext/filters/client_channel/channel_connectivity.c',
diff --git a/build.yaml b/build.yaml
index 0c326fee02..a459b9d688 100644
--- a/build.yaml
+++ b/build.yaml
@@ -925,24 +925,33 @@ filegroups:
- src/core/tsi/gts_transport_security.h
- src/core/tsi/ssl_transport_security.h
- src/core/tsi/ssl_types.h
- - src/core/tsi/transport_security.h
- - src/core/tsi/transport_security_adapter.h
- src/core/tsi/transport_security_grpc.h
- - src/core/tsi/transport_security_interface.h
src:
- src/core/tsi/fake_transport_security.c
- src/core/tsi/gts_transport_security.c
- src/core/tsi/ssl_transport_security.c
- - src/core/tsi/transport_security.c
- - src/core/tsi/transport_security_adapter.c
- src/core/tsi/transport_security_grpc.c
deps:
- gpr
plugin: grpc_tsi_gts
secure: true
uses:
- - grpc_trace
+ - tsi_interface
- grpc_base
+ - grpc_trace
+- name: tsi_interface
+ headers:
+ - src/core/tsi/transport_security.h
+ - src/core/tsi/transport_security_adapter.h
+ - src/core/tsi/transport_security_interface.h
+ src:
+ - src/core/tsi/transport_security.c
+ - src/core/tsi/transport_security_adapter.c
+ deps:
+ - gpr
+ secure: true
+ uses:
+ - grpc_trace
- name: grpc++_codegen_base
language: c++
public_headers:
diff --git a/config.m4 b/config.m4
index 066339423c..f6f8531b2f 100644
--- a/config.m4
+++ b/config.m4
@@ -270,9 +270,9 @@ if test "$PHP_GRPC" != "no"; then
src/core/tsi/fake_transport_security.c \
src/core/tsi/gts_transport_security.c \
src/core/tsi/ssl_transport_security.c \
+ src/core/tsi/transport_security_grpc.c \
src/core/tsi/transport_security.c \
src/core/tsi/transport_security_adapter.c \
- src/core/tsi/transport_security_grpc.c \
src/core/ext/transport/chttp2/server/chttp2_server.c \
src/core/ext/transport/chttp2/client/secure/secure_channel_create.c \
src/core/ext/filters/client_channel/channel_connectivity.c \
diff --git a/config.w32 b/config.w32
index 12db12cae8..1d1a0a4b63 100644
--- a/config.w32
+++ b/config.w32
@@ -247,9 +247,9 @@ if (PHP_GRPC != "no") {
"src\\core\\tsi\\fake_transport_security.c " +
"src\\core\\tsi\\gts_transport_security.c " +
"src\\core\\tsi\\ssl_transport_security.c " +
+ "src\\core\\tsi\\transport_security_grpc.c " +
"src\\core\\tsi\\transport_security.c " +
"src\\core\\tsi\\transport_security_adapter.c " +
- "src\\core\\tsi\\transport_security_grpc.c " +
"src\\core\\ext\\transport\\chttp2\\server\\chttp2_server.c " +
"src\\core\\ext\\transport\\chttp2\\client\\secure\\secure_channel_create.c " +
"src\\core\\ext\\filters\\client_channel\\channel_connectivity.c " +
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 0815e83467..4b1a8f38a3 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -290,9 +290,9 @@ Pod::Spec.new do |s|
'src/core/tsi/gts_transport_security.h',
'src/core/tsi/ssl_transport_security.h',
'src/core/tsi/ssl_types.h',
+ 'src/core/tsi/transport_security_grpc.h',
'src/core/tsi/transport_security.h',
'src/core/tsi/transport_security_adapter.h',
- 'src/core/tsi/transport_security_grpc.h',
'src/core/tsi/transport_security_interface.h',
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/filters/client_channel/client_channel.h',
@@ -649,9 +649,9 @@ Pod::Spec.new do |s|
'src/core/tsi/fake_transport_security.c',
'src/core/tsi/gts_transport_security.c',
'src/core/tsi/ssl_transport_security.c',
+ 'src/core/tsi/transport_security_grpc.c',
'src/core/tsi/transport_security.c',
'src/core/tsi/transport_security_adapter.c',
- 'src/core/tsi/transport_security_grpc.c',
'src/core/ext/transport/chttp2/server/chttp2_server.c',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c',
'src/core/ext/filters/client_channel/channel_connectivity.c',
@@ -784,9 +784,9 @@ Pod::Spec.new do |s|
'src/core/tsi/gts_transport_security.h',
'src/core/tsi/ssl_transport_security.h',
'src/core/tsi/ssl_types.h',
+ 'src/core/tsi/transport_security_grpc.h',
'src/core/tsi/transport_security.h',
'src/core/tsi/transport_security_adapter.h',
- 'src/core/tsi/transport_security_grpc.h',
'src/core/tsi/transport_security_interface.h',
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/filters/client_channel/client_channel.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 089d6d3214..f04a14167b 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -222,9 +222,9 @@ Gem::Specification.new do |s|
s.files += %w( src/core/tsi/gts_transport_security.h )
s.files += %w( src/core/tsi/ssl_transport_security.h )
s.files += %w( src/core/tsi/ssl_types.h )
+ s.files += %w( src/core/tsi/transport_security_grpc.h )
s.files += %w( src/core/tsi/transport_security.h )
s.files += %w( src/core/tsi/transport_security_adapter.h )
- s.files += %w( src/core/tsi/transport_security_grpc.h )
s.files += %w( src/core/tsi/transport_security_interface.h )
s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.h )
s.files += %w( src/core/ext/filters/client_channel/client_channel.h )
@@ -581,9 +581,9 @@ Gem::Specification.new do |s|
s.files += %w( src/core/tsi/fake_transport_security.c )
s.files += %w( src/core/tsi/gts_transport_security.c )
s.files += %w( src/core/tsi/ssl_transport_security.c )
+ s.files += %w( src/core/tsi/transport_security_grpc.c )
s.files += %w( src/core/tsi/transport_security.c )
s.files += %w( src/core/tsi/transport_security_adapter.c )
- s.files += %w( src/core/tsi/transport_security_grpc.c )
s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.c )
s.files += %w( src/core/ext/transport/chttp2/client/secure/secure_channel_create.c )
s.files += %w( src/core/ext/filters/client_channel/channel_connectivity.c )
diff --git a/package.xml b/package.xml
index 09f549832d..4e288b3c68 100644
--- a/package.xml
+++ b/package.xml
@@ -236,9 +236,9 @@
<file baseinstalldir="/" name="src/core/tsi/gts_transport_security.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/ssl_transport_security.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/ssl_types.h" role="src" />
+ <file baseinstalldir="/" name="src/core/tsi/transport_security_grpc.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.h" role="src" />
- <file baseinstalldir="/" name="src/core/tsi/transport_security_grpc.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security_interface.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel.h" role="src" />
@@ -595,9 +595,9 @@
<file baseinstalldir="/" name="src/core/tsi/fake_transport_security.c" role="src" />
<file baseinstalldir="/" name="src/core/tsi/gts_transport_security.c" role="src" />
<file baseinstalldir="/" name="src/core/tsi/ssl_transport_security.c" role="src" />
+ <file baseinstalldir="/" name="src/core/tsi/transport_security_grpc.c" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security.c" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.c" role="src" />
- <file baseinstalldir="/" name="src/core/tsi/transport_security_grpc.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/secure/secure_channel_create.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/channel_connectivity.c" role="src" />
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 365aa583bb..9472a8e520 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -42,6 +42,7 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/support/murmur_hash.h"
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
@@ -239,22 +240,43 @@ struct grpc_pollset_set {
* condition variable polling definitions
*/
+#define POLLCV_THREAD_GRACE_MS 1000
#define CV_POLL_PERIOD_MS 1000
#define CV_DEFAULT_TABLE_SIZE 16
-typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t;
-
-typedef struct poll_args {
+typedef struct poll_result {
gpr_refcount refcount;
- gpr_cv *cv;
+ cv_node *watchers;
+ int watchcount;
struct pollfd *fds;
nfds_t nfds;
- int timeout;
int retval;
int err;
- gpr_atm status;
+ int completed;
+} poll_result;
+
+typedef struct poll_args {
+ gpr_cv trigger;
+ int trigger_set;
+ struct pollfd *fds;
+ nfds_t nfds;
+ poll_result *result;
+ struct poll_args *next;
+ struct poll_args *prev;
} poll_args;
+// This is a 2-tiered cache, we mantain a hash table
+// of active poll calls, so we can wait on the result
+// of that call. We also maintain a freelist of inactive
+// poll threads.
+typedef struct poll_hash_table {
+ poll_args *free_pollers;
+ poll_args **active_pollers;
+ unsigned int size;
+ unsigned int count;
+} poll_hash_table;
+
+poll_hash_table poll_cache;
cv_fd_table g_cvfds;
/*******************************************************************************
@@ -1277,43 +1299,205 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
* Condition Variable polling extensions
*/
-static void decref_poll_args(poll_args *args) {
- if (gpr_unref(&args->refcount)) {
- gpr_free(args->fds);
- gpr_cv_destroy(args->cv);
- gpr_free(args->cv);
- gpr_free(args);
+static void run_poll(void *args);
+static void cache_poller_locked(poll_args *args);
+
+static void cache_insert_locked(poll_args *args) {
+ uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
+ 0xDEADBEEF);
+ key = key % poll_cache.size;
+ if (poll_cache.active_pollers[key]) {
+ poll_cache.active_pollers[key]->prev = args;
}
+ args->next = poll_cache.active_pollers[key];
+ args->prev = NULL;
+ poll_cache.active_pollers[key] = args;
+ poll_cache.count++;
}
-// Poll in a background thread
-static void run_poll(void *arg) {
- int timeout, retval;
- poll_args *pargs = (poll_args *)arg;
- while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
- if (pargs->timeout < 0) {
- timeout = CV_POLL_PERIOD_MS;
- } else {
- timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout);
- pargs->timeout -= timeout;
+static void init_result(poll_args *pargs) {
+ pargs->result = gpr_malloc(sizeof(poll_result));
+ gpr_ref_init(&pargs->result->refcount, 1);
+ pargs->result->watchers = NULL;
+ pargs->result->watchcount = 0;
+ pargs->result->fds = gpr_malloc(sizeof(struct pollfd) * pargs->nfds);
+ memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds);
+ pargs->result->nfds = pargs->nfds;
+ pargs->result->retval = 0;
+ pargs->result->err = 0;
+ pargs->result->completed = 0;
+}
+
+// Creates a poll_args object for a given arguments to poll().
+// This object may return a poll_args in the cache.
+static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) {
+ uint32_t key =
+ gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF);
+ key = key % poll_cache.size;
+ poll_args *curr = poll_cache.active_pollers[key];
+ while (curr) {
+ if (curr->nfds == count &&
+ memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) {
+ gpr_free(fds);
+ return curr;
}
- retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
- if (retval != 0 || pargs->timeout == 0) {
- pargs->retval = retval;
- pargs->err = errno;
- break;
+ curr = curr->next;
+ }
+
+ if (poll_cache.free_pollers) {
+ poll_args *pargs = poll_cache.free_pollers;
+ poll_cache.free_pollers = pargs->next;
+ if (poll_cache.free_pollers) {
+ poll_cache.free_pollers->prev = NULL;
}
+ pargs->fds = fds;
+ pargs->nfds = count;
+ pargs->next = NULL;
+ pargs->prev = NULL;
+ init_result(pargs);
+ cache_poller_locked(pargs);
+ return pargs;
+ }
+
+ poll_args *pargs = gpr_malloc(sizeof(struct poll_args));
+ gpr_cv_init(&pargs->trigger);
+ pargs->fds = fds;
+ pargs->nfds = count;
+ pargs->next = NULL;
+ pargs->prev = NULL;
+ pargs->trigger_set = 0;
+ init_result(pargs);
+ cache_poller_locked(pargs);
+ gpr_thd_id t_id;
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_ref(&g_cvfds.pollcount);
+ gpr_thd_options_set_detached(&opt);
+ GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
+ return pargs;
+}
+
+static void cache_delete_locked(poll_args *args) {
+ if (!args->prev) {
+ uint32_t key = gpr_murmur_hash3(
+ args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF);
+ key = key % poll_cache.size;
+ GPR_ASSERT(poll_cache.active_pollers[key] == args);
+ poll_cache.active_pollers[key] = args->next;
+ } else {
+ args->prev->next = args->next;
}
- gpr_mu_lock(&g_cvfds.mu);
- if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
- // Signal main thread that the poll completed
- gpr_atm_no_barrier_store(&pargs->status, COMPLETED);
- gpr_cv_signal(pargs->cv);
+
+ if (args->next) {
+ args->next->prev = args->prev;
}
- decref_poll_args(pargs);
- g_cvfds.pollcount--;
- if (g_cvfds.shutdown && g_cvfds.pollcount == 0) {
- gpr_cv_signal(&g_cvfds.shutdown_complete);
+
+ poll_cache.count--;
+ if (poll_cache.free_pollers) {
+ poll_cache.free_pollers->prev = args;
+ }
+ args->prev = NULL;
+ args->next = poll_cache.free_pollers;
+ gpr_free(args->fds);
+ poll_cache.free_pollers = args;
+}
+
+static void cache_poller_locked(poll_args *args) {
+ if (poll_cache.count + 1 > poll_cache.size / 2) {
+ poll_args **old_active_pollers = poll_cache.active_pollers;
+ poll_cache.size = poll_cache.size * 2;
+ poll_cache.count = 0;
+ poll_cache.active_pollers = gpr_malloc(sizeof(void *) * poll_cache.size);
+ for (unsigned int i = 0; i < poll_cache.size; i++) {
+ poll_cache.active_pollers[i] = NULL;
+ }
+ for (unsigned int i = 0; i < poll_cache.size / 2; i++) {
+ poll_args *curr = old_active_pollers[i];
+ poll_args *next = NULL;
+ while (curr) {
+ next = curr->next;
+ cache_insert_locked(curr);
+ curr = next;
+ }
+ }
+ gpr_free(old_active_pollers);
+ }
+
+ cache_insert_locked(args);
+}
+
+static void cache_destroy_locked(poll_args *args) {
+ if (args->next) {
+ args->next->prev = args->prev;
+ }
+
+ if (args->prev) {
+ args->prev->next = args->next;
+ } else {
+ poll_cache.free_pollers = args->next;
+ }
+
+ gpr_free(args);
+}
+
+static void decref_poll_result(poll_result *res) {
+ if (gpr_unref(&res->refcount)) {
+ GPR_ASSERT(!res->watchers);
+ gpr_free(res->fds);
+ gpr_free(res);
+ }
+}
+
+void remove_cvn(cv_node **head, cv_node *target) {
+ if (target->next) {
+ target->next->prev = target->prev;
+ }
+
+ if (target->prev) {
+ target->prev->next = target->next;
+ } else {
+ *head = target->next;
+ }
+}
+
+gpr_timespec thread_grace;
+
+// Poll in a background thread
+static void run_poll(void *args) {
+ poll_args *pargs = (poll_args *)args;
+ while (1) {
+ poll_result *result = pargs->result;
+ int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
+ gpr_mu_lock(&g_cvfds.mu);
+ if (retval != 0) {
+ result->completed = 1;
+ result->retval = retval;
+ result->err = errno;
+ cv_node *watcher = result->watchers;
+ while (watcher) {
+ gpr_cv_signal(watcher->cv);
+ watcher = watcher->next;
+ }
+ }
+ if (result->watchcount == 0 || result->completed) {
+ cache_delete_locked(pargs);
+ decref_poll_result(result);
+ // Leave this polling thread alive for a grace period to do another poll()
+ // op
+ gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
+ deadline = gpr_time_add(deadline, thread_grace);
+ pargs->trigger_set = 0;
+ gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
+ if (!pargs->trigger_set) {
+ cache_destroy_locked(pargs);
+ break;
+ }
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+ }
+
+ // We still have the lock here
+ if (gpr_unref(&g_cvfds.pollcount)) {
+ gpr_cv_signal(&g_cvfds.shutdown_cv);
}
gpr_mu_unlock(&g_cvfds.mu);
}
@@ -1322,24 +1506,29 @@ static void run_poll(void *arg) {
static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
unsigned int i;
int res, idx;
- gpr_cv *pollcv;
- cv_node *cvn, *prev;
+ cv_node *pollcv;
int skip_poll = 0;
nfds_t nsockfds = 0;
- gpr_thd_id t_id;
- gpr_thd_options opt;
- poll_args *pargs = NULL;
+ poll_result *result = NULL;
gpr_mu_lock(&g_cvfds.mu);
- pollcv = gpr_malloc(sizeof(gpr_cv));
- gpr_cv_init(pollcv);
+ pollcv = gpr_malloc(sizeof(cv_node));
+ pollcv->next = NULL;
+ gpr_cv pollcv_cv;
+ gpr_cv_init(&pollcv_cv);
+ pollcv->cv = &pollcv_cv;
+ cv_node *fd_cvs = gpr_malloc(nfds * sizeof(cv_node));
+
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
idx = FD_TO_IDX(fds[i].fd);
- cvn = gpr_malloc(sizeof(cv_node));
- cvn->cv = pollcv;
- cvn->next = g_cvfds.cvfds[idx].cvs;
- g_cvfds.cvfds[idx].cvs = cvn;
+ fd_cvs[i].cv = &pollcv_cv;
+ fd_cvs[i].prev = NULL;
+ fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
+ if (g_cvfds.cvfds[idx].cvs) {
+ g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]);
+ }
+ g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]);
// Don't bother polling if a wakeup fd is ready
if (g_cvfds.cvfds[idx].is_set) {
skip_poll = 1;
@@ -1349,81 +1538,68 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
}
}
+ gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
+ if (timeout < 0) {
+ deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ } else {
+ deadline =
+ gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
+ }
+
res = 0;
if (!skip_poll && nsockfds > 0) {
- pargs = gpr_malloc(sizeof(struct poll_args));
- // Both the main thread and calling thread get a reference
- gpr_ref_init(&pargs->refcount, 2);
- pargs->cv = pollcv;
- pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
- pargs->nfds = nsockfds;
- pargs->timeout = timeout;
- pargs->retval = 0;
- pargs->err = 0;
- gpr_atm_no_barrier_store(&pargs->status, INPROGRESS);
+ struct pollfd *pollfds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd >= 0) {
- pargs->fds[idx].fd = fds[i].fd;
- pargs->fds[idx].events = fds[i].events;
- pargs->fds[idx].revents = 0;
+ pollfds[idx].fd = fds[i].fd;
+ pollfds[idx].events = fds[i].events;
+ pollfds[idx].revents = 0;
idx++;
}
}
- g_cvfds.pollcount++;
- opt = gpr_thd_options_default();
- gpr_thd_options_set_detached(&opt);
- GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
- // We want the poll() thread to trigger the deadline, so wait forever here
- gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
- if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
- res = pargs->retval;
- errno = pargs->err;
- } else {
- errno = 0;
- gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
+ poll_args *pargs = get_poller_locked(pollfds, nsockfds);
+ result = pargs->result;
+ pollcv->next = result->watchers;
+ pollcv->prev = NULL;
+ if (result->watchers) {
+ result->watchers->prev = pollcv;
}
+ result->watchers = pollcv;
+ result->watchcount++;
+ gpr_ref(&result->refcount);
+
+ pargs->trigger_set = 1;
+ gpr_cv_signal(&pargs->trigger);
+ gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
+ res = result->retval;
+ errno = result->err;
+ result->watchcount--;
+ remove_cvn(&result->watchers, pollcv);
} else if (!skip_poll) {
- gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
- deadline =
- gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
- gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
+ gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
}
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
- cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
- prev = NULL;
- while (cvn->cv != pollcv) {
- prev = cvn;
- cvn = cvn->next;
- GPR_ASSERT(cvn);
- }
- if (!prev) {
- g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
- } else {
- prev->next = cvn->next;
- }
- gpr_free(cvn);
-
+ remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
- } else if (!skip_poll && fds[i].fd >= 0 &&
- gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
- fds[i].revents = pargs->fds[idx].revents;
+ } else if (!skip_poll && fds[i].fd >= 0 && result->completed) {
+ fds[i].revents = result->fds[idx].revents;
idx++;
}
}
- if (pargs) {
- decref_poll_args(pargs);
- } else {
- gpr_cv_destroy(pollcv);
- gpr_free(pollcv);
+ gpr_free(fd_cvs);
+ gpr_free(pollcv);
+ if (result) {
+ decref_poll_result(result);
}
+
gpr_mu_unlock(&g_cvfds.mu);
return res;
@@ -1432,12 +1608,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
static void global_cv_fd_table_init() {
gpr_mu_init(&g_cvfds.mu);
gpr_mu_lock(&g_cvfds.mu);
- gpr_cv_init(&g_cvfds.shutdown_complete);
- g_cvfds.shutdown = 0;
- g_cvfds.pollcount = 0;
+ gpr_cv_init(&g_cvfds.shutdown_cv);
+ gpr_ref_init(&g_cvfds.pollcount, 1);
g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
g_cvfds.free_fds = NULL;
+ thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = NULL;
@@ -1447,23 +1623,35 @@ static void global_cv_fd_table_init() {
// Override the poll function with one that supports cvfds
g_cvfds.poll = grpc_poll_function;
grpc_poll_function = &cvfd_poll;
+
+ // Initialize the cache
+ poll_cache.size = 32;
+ poll_cache.count = 0;
+ poll_cache.free_pollers = NULL;
+ poll_cache.active_pollers = gpr_malloc(sizeof(void *) * 32);
+ for (unsigned int i = 0; i < poll_cache.size; i++) {
+ poll_cache.active_pollers[i] = NULL;
+ }
+
gpr_mu_unlock(&g_cvfds.mu);
}
static void global_cv_fd_table_shutdown() {
gpr_mu_lock(&g_cvfds.mu);
- g_cvfds.shutdown = 1;
// Attempt to wait for all abandoned poll() threads to terminate
// Not doing so will result in reported memory leaks
- if (g_cvfds.pollcount > 0) {
- int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu,
+ if (!gpr_unref(&g_cvfds.pollcount)) {
+ int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(3, GPR_TIMESPAN)));
GPR_ASSERT(res == 0);
}
- gpr_cv_destroy(&g_cvfds.shutdown_complete);
+ gpr_cv_destroy(&g_cvfds.shutdown_cv);
grpc_poll_function = g_cvfds.poll;
gpr_free(g_cvfds.cvfds);
+
+ gpr_free(poll_cache.active_pollers);
+
gpr_mu_unlock(&g_cvfds.mu);
gpr_mu_destroy(&g_cvfds.mu);
}
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index c5dcdc9746..46e84f5843 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -43,6 +43,7 @@
typedef struct cv_node {
gpr_cv* cv;
struct cv_node* next;
+ struct cv_node* prev;
} cv_node;
typedef struct fd_node {
@@ -53,9 +54,8 @@ typedef struct fd_node {
typedef struct cv_fd_table {
gpr_mu mu;
- int pollcount;
- int shutdown;
- gpr_cv shutdown_complete;
+ gpr_refcount pollcount;
+ gpr_cv shutdown_cv;
fd_node* cvfds;
fd_node* free_fds;
unsigned int size;
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index ffa941bb9e..c59e55136c 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -109,6 +109,8 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx,
(grpc_oauth2_token_fetcher_credentials *)creds;
GRPC_MDELEM_UNREF(exec_ctx, c->access_token_md);
gpr_mu_destroy(&c->mu);
+ grpc_pollset_set_destroy(exec_ctx,
+ grpc_polling_entity_pollset_set(&c->pollent));
grpc_httpcli_context_destroy(exec_ctx, &c->httpcli_context);
}
@@ -238,6 +240,9 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx,
"Error occured when fetching oauth2 token.", &error, 1);
}
GRPC_CLOSURE_SCHED(exec_ctx, pending_request->on_request_metadata, error);
+ grpc_polling_entity_del_from_pollset_set(
+ exec_ctx, pending_request->pollent,
+ grpc_polling_entity_pollset_set(&c->pollent));
grpc_oauth2_pending_get_request_metadata *prev = pending_request;
pending_request = pending_request->next;
gpr_free(prev);
@@ -278,6 +283,9 @@ static bool oauth2_token_fetcher_get_request_metadata(
sizeof(*pending_request));
pending_request->md_array = md_array;
pending_request->on_request_metadata = on_request_metadata;
+ pending_request->pollent = pollent;
+ grpc_polling_entity_add_to_pollset_set(
+ exec_ctx, pollent, grpc_polling_entity_pollset_set(&c->pollent));
pending_request->next = c->pending_requests;
c->pending_requests = pending_request;
bool start_fetch = false;
@@ -289,7 +297,7 @@ static bool oauth2_token_fetcher_get_request_metadata(
if (start_fetch) {
grpc_call_credentials_ref(creds);
c->fetch_func(exec_ctx, grpc_credentials_metadata_request_create(creds),
- &c->httpcli_context, pollent,
+ &c->httpcli_context, &c->pollent,
on_oauth2_token_fetcher_http_response,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold));
}
@@ -334,6 +342,8 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c,
gpr_mu_init(&c->mu);
c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
c->fetch_func = fetch_func;
+ c->pollent =
+ grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create());
grpc_httpcli_context_init(&c->httpcli_context);
}
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
index 9d041a20ea..d9ad6691b8 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
@@ -62,6 +62,7 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx,
typedef struct grpc_oauth2_pending_get_request_metadata {
grpc_credentials_mdelem_array *md_array;
grpc_closure *on_request_metadata;
+ grpc_polling_entity *pollent;
struct grpc_oauth2_pending_get_request_metadata *next;
} grpc_oauth2_pending_get_request_metadata;
@@ -74,6 +75,7 @@ typedef struct {
grpc_oauth2_pending_get_request_metadata *pending_requests;
grpc_httpcli_context httpcli_context;
grpc_fetch_oauth2_func fetch_func;
+ grpc_polling_entity pollent;
} grpc_oauth2_token_fetcher_credentials;
// Google refresh token credentials.
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index c74d04c829..0cb9288131 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -92,9 +92,33 @@ namespace Grpc.Core.Tests
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
+ Assert.AreEqual(0, ex.Trailers.Count);
var ex2 = Assert.ThrowsAsync<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
+ Assert.AreEqual(0, ex.Trailers.Count);
+ }
+
+ [Test]
+ public void UnaryCall_ServerHandlerThrowsRpcExceptionWithTrailers()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
+ {
+ var trailers = new Metadata { {"xyz", "xyz-value"} };
+ throw new RpcException(new Status(StatusCode.Unauthenticated, ""), trailers);
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
+ Assert.AreEqual(1, ex.Trailers.Count);
+ Assert.AreEqual("xyz", ex.Trailers[0].Key);
+ Assert.AreEqual("xyz-value", ex.Trailers[0].Value);
+
+ var ex2 = Assert.ThrowsAsync<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
+ Assert.AreEqual(1, ex2.Trailers.Count);
+ Assert.AreEqual("xyz", ex2.Trailers[0].Key);
+ Assert.AreEqual("xyz-value", ex2.Trailers[0].Value);
}
[Test]
@@ -108,9 +132,34 @@ namespace Grpc.Core.Tests
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
+ Assert.AreEqual(0, ex.Trailers.Count);
+
+ var ex2 = Assert.ThrowsAsync<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
+ Assert.AreEqual(0, ex2.Trailers.Count);
+ }
+
+ [Test]
+ public void UnaryCall_ServerHandlerSetsStatusAndTrailers()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ context.Status = new Status(StatusCode.Unauthenticated, "");
+ context.ResponseTrailers.Add("xyz", "xyz-value");
+ return "";
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
+ Assert.AreEqual(1, ex.Trailers.Count);
+ Assert.AreEqual("xyz", ex.Trailers[0].Key);
+ Assert.AreEqual("xyz-value", ex.Trailers[0].Value);
var ex2 = Assert.ThrowsAsync<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
+ Assert.AreEqual(1, ex2.Trailers.Count);
+ Assert.AreEqual("xyz", ex2.Trailers[0].Key);
+ Assert.AreEqual("xyz-value", ex2.Trailers[0].Value);
}
[Test]
@@ -148,7 +197,7 @@ namespace Grpc.Core.Tests
CollectionAssert.AreEqual(new string[] { "A", "B", "C" }, await call.ResponseStream.ToListAsync());
Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode);
- Assert.IsNotNull("xyz", call.GetTrailers()[0].Key);
+ Assert.AreEqual("xyz", call.GetTrailers()[0].Key);
}
[Test]
@@ -183,6 +232,27 @@ namespace Grpc.Core.Tests
}
[Test]
+ public async Task ServerStreamingCall_TrailersFromMultipleSourcesGetConcatenated()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ context.ResponseTrailers.Add("xyz", "xyz-value");
+ throw new RpcException(new Status(StatusCode.InvalidArgument, ""), new Metadata { {"abc", "abc-value"} });
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode);
+ Assert.AreEqual(2, call.GetTrailers().Count);
+ Assert.AreEqual(2, ex.Trailers.Count);
+ Assert.AreEqual("xyz", ex.Trailers[0].Key);
+ Assert.AreEqual("xyz-value", ex.Trailers[0].Value);
+ Assert.AreEqual("abc", ex.Trailers[1].Key);
+ Assert.AreEqual("abc-value", ex.Trailers[1].Value);
+ }
+
+ [Test]
public async Task DuplexStreamingCall()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
@@ -199,7 +269,7 @@ namespace Grpc.Core.Tests
CollectionAssert.AreEqual(new string[] { "A", "B", "C" }, await call.ResponseStream.ToListAsync());
Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode);
- Assert.IsNotNull("xyz-value", call.GetTrailers()[0].Value);
+ Assert.AreEqual("xyz-value", call.GetTrailers()[0].Value);
}
[Test]
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 0f3a82c605..fc9d5599f2 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -18,6 +18,7 @@
using System;
using System.Linq;
+using System.Threading;
using Grpc.Core;
using NUnit.Framework;
@@ -75,5 +76,19 @@ namespace Grpc.Core.Tests
var parts = coreVersion.Split('.');
Assert.AreEqual(3, parts.Length);
}
+
+ [Test]
+ public void ShuttingDownEventIsFired()
+ {
+ var cts = new CancellationTokenSource();
+ var handler = new EventHandler((sender, args) => { cts.Cancel(); });
+
+ GrpcEnvironment.ShuttingDown += handler;
+ var env = GrpcEnvironment.AddRef();
+ GrpcEnvironment.ReleaseAsync().Wait();
+ GrpcEnvironment.ShuttingDown -= handler;
+
+ Assert.IsTrue(cts.Token.IsCancellationRequested);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 0663ee9215..cbc7d2078c 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -49,7 +49,7 @@ namespace Grpc.Core
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
- bool isClosed;
+ bool isShutdown;
/// <summary>
/// Returns a reference-counted instance of initialized gRPC environment.
@@ -238,6 +238,12 @@ namespace Grpc.Core
}
/// <summary>
+ /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic.
+ /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first).
+ /// </summary>
+ public static event EventHandler ShuttingDown;
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
@@ -311,13 +317,16 @@ namespace Grpc.Core
/// </summary>
private async Task ShutdownAsync()
{
- if (isClosed)
+ if (isShutdown)
{
- throw new InvalidOperationException("Close has already been called");
+ throw new InvalidOperationException("ShutdownAsync has already been called");
}
+
+ await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false);
+
await threadPool.StopAsync().ConfigureAwait(false);
GrpcNativeShutdown();
- isClosed = true;
+ isShutdown = true;
debugStats.CheckOK();
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 6e6ca7cd53..17109de587 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -329,7 +329,7 @@ namespace Grpc.Core.Internal
protected override Exception GetRpcExceptionClientOnly()
{
- return new RpcException(finishedStatus.Value.Status);
+ return new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers);
}
protected override Task CheckSendAllowedOrEarlyResult()
@@ -348,7 +348,7 @@ namespace Grpc.Core.Internal
// Writing after the call has finished is not a programming error because server can close
// the call anytime, so don't throw directly, but let the write task finish with an error.
var tcs = new TaskCompletionSource<object>();
- tcs.SetException(new RpcException(finishedStatus.Value.Status));
+ tcs.SetException(new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers));
return tcs.Task;
}
@@ -468,7 +468,7 @@ namespace Grpc.Core.Internal
var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
- unaryResponseTcs.SetException(new RpcException(status));
+ unaryResponseTcs.SetException(new RpcException(status, receivedStatus.Trailers));
return;
}
@@ -506,7 +506,7 @@ namespace Grpc.Core.Internal
var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
- streamingResponseCallFinishedTcs.SetException(new RpcException(status));
+ streamingResponseCallFinishedTcs.SetException(new RpcException(status, receivedStatus.Trailers));
return;
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 36702a3fab..6019f8e793 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -76,7 +76,7 @@ namespace Grpc.Core.Internal
{
Logger.Warning(e, "Exception occured in handler.");
}
- status = HandlerUtils.StatusFromException(e);
+ status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
}
try
{
@@ -133,7 +133,7 @@ namespace Grpc.Core.Internal
{
Logger.Warning(e, "Exception occured in handler.");
}
- status = HandlerUtils.StatusFromException(e);
+ status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
}
try
@@ -191,7 +191,7 @@ namespace Grpc.Core.Internal
{
Logger.Warning(e, "Exception occured in handler.");
}
- status = HandlerUtils.StatusFromException(e);
+ status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
}
try
@@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
{
Logger.Warning(e, "Exception occured in handler.");
}
- status = HandlerUtils.StatusFromException(e);
+ status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
}
try
{
@@ -292,11 +292,20 @@ namespace Grpc.Core.Internal
internal static class HandlerUtils
{
- public static Status StatusFromException(Exception e)
+ public static Status GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers)
{
var rpcException = e as RpcException;
if (rpcException != null)
{
+ // There are two sources of metadata entries on the server-side:
+ // 1. serverCallContext.ResponseTrailers
+ // 2. trailers in RpcException thrown by user code in server side handler.
+ // As metadata allows duplicate keys, the logical thing to do is
+ // to just merge trailers from RpcException into serverCallContext.ResponseTrailers.
+ foreach (var entry in rpcException.Trailers)
+ {
+ callContextResponseTrailers.Add(entry);
+ }
// use the status thrown by handler.
return rpcException.Status;
}
diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs
index 01b9e4fb1a..d2c912e73a 100644
--- a/src/csharp/Grpc.Core/RpcException.cs
+++ b/src/csharp/Grpc.Core/RpcException.cs
@@ -17,6 +17,7 @@
#endregion
using System;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
@@ -26,6 +27,7 @@ namespace Grpc.Core
public class RpcException : Exception
{
private readonly Status status;
+ private readonly Metadata trailers;
/// <summary>
/// Creates a new <c>RpcException</c> associated with given status.
@@ -34,6 +36,7 @@ namespace Grpc.Core
public RpcException(Status status) : base(status.ToString())
{
this.status = status;
+ this.trailers = Metadata.Empty;
}
/// <summary>
@@ -44,6 +47,18 @@ namespace Grpc.Core
public RpcException(Status status, string message) : base(message)
{
this.status = status;
+ this.trailers = Metadata.Empty;
+ }
+
+ /// <summary>
+ /// Creates a new <c>RpcException</c> associated with given status and trailing response metadata.
+ /// </summary>
+ /// <param name="status">Resulting status of a call.</param>
+ /// <param name="trailers">Response trailing metadata.</param>
+ public RpcException(Status status, Metadata trailers) : base(status.ToString())
+ {
+ this.status = status;
+ this.trailers = GrpcPreconditions.CheckNotNull(trailers);
}
/// <summary>
@@ -56,5 +71,18 @@ namespace Grpc.Core
return status;
}
}
+
+ /// <summary>
+ /// Gets the call trailing metadata.
+ /// Trailers only have meaningful content for client-side calls (in which case they represent the trailing metadata sent by the server when closing the call).
+ /// Instances of <c>RpcException</c> thrown by the server-side part of the stack will have trailers always set to empty.
+ /// </summary>
+ public Metadata Trailers
+ {
+ get
+ {
+ return trailers;
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs b/src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs
index be996f91e0..374c6fc23f 100644
--- a/src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs
@@ -65,7 +65,7 @@ namespace Grpc.IntegrationTesting
}
[Test]
- public async Task UnaryCall()
+ public async Task ErrorDetailsFromCallObject()
{
var call = client.UnaryCallAsync(new SimpleRequest { ResponseSize = 10 });
@@ -83,7 +83,24 @@ namespace Grpc.IntegrationTesting
}
}
- private DebugInfo GetDebugInfo(Metadata trailers)
+ [Test]
+ public async Task ErrorDetailsFromRpcException()
+ {
+ try
+ {
+ await client.UnaryCallAsync(new SimpleRequest { ResponseSize = 10 });
+ Assert.Fail();
+ }
+ catch (RpcException e)
+ {
+ Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
+ var debugInfo = GetDebugInfo(e.Trailers);
+ Assert.AreEqual(debugInfo.Detail, ExceptionDetail);
+ Assert.IsNotEmpty(debugInfo.StackEntries);
+ }
+ }
+
+ private static DebugInfo GetDebugInfo(Metadata trailers)
{
var entry = trailers.First((e) => e.Key == DebugInfoTrailerName);
return DebugInfo.Parser.ParseFrom(entry.ValueBytes);
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index bf831d063b..dc4d28f95b 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -246,9 +246,9 @@ CORE_SOURCE_FILES = [
'src/core/tsi/fake_transport_security.c',
'src/core/tsi/gts_transport_security.c',
'src/core/tsi/ssl_transport_security.c',
+ 'src/core/tsi/transport_security_grpc.c',
'src/core/tsi/transport_security.c',
'src/core/tsi/transport_security_adapter.c',
- 'src/core/tsi/transport_security_grpc.c',
'src/core/ext/transport/chttp2/server/chttp2_server.c',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c',
'src/core/ext/filters/client_channel/channel_connectivity.c',
diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c
index e60e398767..441c431135 100644
--- a/test/core/security/credentials_test.c
+++ b/test/core/security/credentials_test.c
@@ -311,6 +311,7 @@ typedef struct {
grpc_credentials_mdelem_array md_array;
grpc_closure on_request_metadata;
grpc_call_credentials *creds;
+ grpc_polling_entity pollent;
} request_metadata_state;
static void check_metadata(const expected_md *expected,
@@ -355,6 +356,8 @@ static void check_request_metadata(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(state->md_array.size == state->expected_size);
check_metadata(state->expected, &state->md_array);
grpc_credentials_mdelem_array_destroy(exec_ctx, &state->md_array);
+ grpc_pollset_set_destroy(exec_ctx,
+ grpc_polling_entity_pollset_set(&state->pollent));
gpr_free(state);
}
@@ -365,6 +368,8 @@ static request_metadata_state *make_request_metadata_state(
state->expected_error = expected_error;
state->expected = expected;
state->expected_size = expected_size;
+ state->pollent =
+ grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create());
GRPC_CLOSURE_INIT(&state->on_request_metadata, check_request_metadata, state,
grpc_schedule_on_exec_ctx);
return state;
@@ -376,7 +381,7 @@ static void run_request_metadata_test(grpc_exec_ctx *exec_ctx,
request_metadata_state *state) {
grpc_error *error = GRPC_ERROR_NONE;
if (grpc_call_credentials_get_request_metadata(
- exec_ctx, creds, NULL, auth_md_ctx, &state->md_array,
+ exec_ctx, creds, &state->pollent, auth_md_ctx, &state->md_array,
&state->on_request_metadata, &error)) {
// Synchronous result. Invoke the callback directly.
check_request_metadata(exec_ctx, state, error);
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 122976c397..8b00bcfeaf 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -550,8 +550,7 @@ static Status ProcessGenericRPC(const PayloadConfig &payload_config,
ByteBuffer *response) {
int resp_size = payload_config.bytebuf_params().resp_size();
std::unique_ptr<char[]> buf(new char[resp_size]);
- grpc_slice s = grpc_slice_from_copied_buffer(buf.get(), resp_size);
- Slice slice(s, Slice::STEAL_REF);
+ Slice slice(buf.get(), resp_size);
*response = ByteBuffer(&slice, 1);
return Status::OK;
}
diff --git a/tools/internal_ci/windows/grpc_portability_master.cfg b/tools/internal_ci/windows/grpc_portability_build_only.cfg
index c395cb4a94..b2b58ece2d 100644
--- a/tools/internal_ci/windows/grpc_portability_master.cfg
+++ b/tools/internal_ci/windows/grpc_portability_build_only.cfg
@@ -26,5 +26,5 @@ action {
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f portability windows -j 1 --inner_jobs 8 --internal_ci"
+ value: "-f portability windows --internal_ci --build_only"
}
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index c1d68d273f..30fef6fc0a 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -9095,17 +9095,15 @@
"deps": [
"gpr",
"grpc_base",
- "grpc_trace"
+ "grpc_trace",
+ "tsi_interface"
],
"headers": [
"src/core/tsi/fake_transport_security.h",
"src/core/tsi/gts_transport_security.h",
"src/core/tsi/ssl_transport_security.h",
"src/core/tsi/ssl_types.h",
- "src/core/tsi/transport_security.h",
- "src/core/tsi/transport_security_adapter.h",
- "src/core/tsi/transport_security_grpc.h",
- "src/core/tsi/transport_security_interface.h"
+ "src/core/tsi/transport_security_grpc.h"
],
"is_filegroup": true,
"language": "c",
@@ -9118,12 +9116,30 @@
"src/core/tsi/ssl_transport_security.c",
"src/core/tsi/ssl_transport_security.h",
"src/core/tsi/ssl_types.h",
+ "src/core/tsi/transport_security_grpc.c",
+ "src/core/tsi/transport_security_grpc.h"
+ ],
+ "third_party": false,
+ "type": "filegroup"
+ },
+ {
+ "deps": [
+ "gpr",
+ "grpc_trace"
+ ],
+ "headers": [
+ "src/core/tsi/transport_security.h",
+ "src/core/tsi/transport_security_adapter.h",
+ "src/core/tsi/transport_security_interface.h"
+ ],
+ "is_filegroup": true,
+ "language": "c",
+ "name": "tsi_interface",
+ "src": [
"src/core/tsi/transport_security.c",
"src/core/tsi/transport_security.h",
"src/core/tsi/transport_security_adapter.c",
"src/core/tsi/transport_security_adapter.h",
- "src/core/tsi/transport_security_grpc.c",
- "src/core/tsi/transport_security_grpc.h",
"src/core/tsi/transport_security_interface.h"
],
"third_party": false,
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index 6eab48c62b..8a659280a4 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -347,9 +347,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\gts_transport_security.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_types.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.h" />
- <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_interface.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\server\chttp2_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\client_channel.h" />
@@ -893,12 +893,12 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.c">
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.c">
- </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\server\chttp2_server.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\secure\secure_channel_create.c">
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index eaae1c6693..6879047f55 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -556,13 +556,13 @@
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security.c">
+ <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.c">
+ <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.c">
+ <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\server\chttp2_server.c">
@@ -1013,13 +1013,13 @@
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_types.h">
<Filter>src\core\tsi</Filter>
</ClInclude>
- <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security.h">
+ <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.h">
<Filter>src\core\tsi</Filter>
</ClInclude>
- <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.h">
+ <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security.h">
<Filter>src\core\tsi</Filter>
</ClInclude>
- <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.h">
+ <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.h">
<Filter>src\core\tsi</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_interface.h">