aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD28
-rw-r--r--CMakeLists.txt2
-rw-r--r--Makefile3
-rw-r--r--binding.gyp1
-rw-r--r--build.yaml21
-rw-r--r--config.m41
-rw-r--r--config.w321
-rw-r--r--doc/load-balancing.md4
-rw-r--r--gRPC-Core.podspec3
-rw-r--r--[-rwxr-xr-x]grpc.gemspec2
-rw-r--r--include/grpc/impl/codegen/byte_buffer_reader.h2
-rw-r--r--include/grpc/impl/codegen/compression_types.h4
-rw-r--r--include/grpc/impl/codegen/grpc_types.h32
-rw-r--r--include/grpc/impl/codegen/slice.h6
-rw-r--r--package.xml2
-rw-r--r--src/core/ext/census/tracing.c1
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c398
-rw-r--r--src/core/lib/iomgr/exec_ctx.c50
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h6
-rw-r--r--src/core/lib/security/transport/security_handshaker.c2
-rw-r--r--src/core/lib/surface/completion_queue.c76
-rw-r--r--src/core/tsi/fake_transport_security.c6
-rw-r--r--src/core/tsi/transport_security.c87
-rw-r--r--src/core/tsi/transport_security.h7
-rw-r--r--src/core/tsi/transport_security_adapter.c9
-rw-r--r--src/core/tsi/transport_security_grpc.c64
-rw-r--r--src/core/tsi/transport_security_grpc.h80
-rw-r--r--src/core/tsi/transport_security_interface.h11
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs15
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs17
-rw-r--r--src/node/ext/call.cc10
-rw-r--r--src/node/test/call_test.js97
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rw-r--r--tools/run_tests/generated/sources_and_headers.json27
-rwxr-xr-xtools/run_tests/run_build_statistics.py15
-rwxr-xr-xtools/run_tests/sanity/core_untyped_structs.sh27
-rw-r--r--tools/run_tests/sanity/sanity_tests.yaml1
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters6
40 files changed, 858 insertions, 272 deletions
diff --git a/BUILD b/BUILD
index e01313f35d..49c340d070 100644
--- a/BUILD
+++ b/BUILD
@@ -1412,31 +1412,45 @@ grpc_cc_library(
)
grpc_cc_library(
+ name = "tsi_interface",
+ srcs = [
+ "src/core/tsi/transport_security.c",
+ "src/core/tsi/transport_security_adapter.c",
+ ],
+ hdrs = [
+ "src/core/tsi/transport_security.h",
+ "src/core/tsi/transport_security_adapter.h",
+ "src/core/tsi/transport_security_interface.h",
+ ],
+ language = "c",
+ deps = [
+ "gpr",
+ "grpc_trace",
+ ],
+)
+
+grpc_cc_library(
name = "tsi",
srcs = [
"src/core/tsi/fake_transport_security.c",
"src/core/tsi/gts_transport_security.c",
"src/core/tsi/ssl_transport_security.c",
- "src/core/tsi/transport_security.c",
- "src/core/tsi/transport_security_adapter.c",
+ "src/core/tsi/transport_security_grpc.c",
],
hdrs = [
"src/core/tsi/fake_transport_security.h",
"src/core/tsi/gts_transport_security.h",
"src/core/tsi/ssl_transport_security.h",
"src/core/tsi/ssl_types.h",
- "src/core/tsi/transport_security.h",
- "src/core/tsi/transport_security_adapter.h",
- "src/core/tsi/transport_security_interface.h",
+ "src/core/tsi/transport_security_grpc.h",
],
external_deps = [
"libssl",
],
language = "c",
deps = [
- "gpr",
"grpc_base",
- "grpc_trace",
+ "tsi_interface",
],
)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 743b34f11a..8dc4758d23 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1132,6 +1132,7 @@ add_library(grpc
src/core/tsi/fake_transport_security.c
src/core/tsi/gts_transport_security.c
src/core/tsi/ssl_transport_security.c
+ src/core/tsi/transport_security_grpc.c
src/core/tsi/transport_security.c
src/core/tsi/transport_security_adapter.c
src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -1503,6 +1504,7 @@ add_library(grpc_cronet
src/core/tsi/fake_transport_security.c
src/core/tsi/gts_transport_security.c
src/core/tsi/ssl_transport_security.c
+ src/core/tsi/transport_security_grpc.c
src/core/tsi/transport_security.c
src/core/tsi/transport_security_adapter.c
src/core/ext/transport/chttp2/client/chttp2_connector.c
diff --git a/Makefile b/Makefile
index af0ca3f126..74f05f592d 100644
--- a/Makefile
+++ b/Makefile
@@ -3079,6 +3079,7 @@ LIBGRPC_SRC = \
src/core/tsi/fake_transport_security.c \
src/core/tsi/gts_transport_security.c \
src/core/tsi/ssl_transport_security.c \
+ src/core/tsi/transport_security_grpc.c \
src/core/tsi/transport_security.c \
src/core/tsi/transport_security_adapter.c \
src/core/ext/transport/chttp2/server/chttp2_server.c \
@@ -3448,6 +3449,7 @@ LIBGRPC_CRONET_SRC = \
src/core/tsi/fake_transport_security.c \
src/core/tsi/gts_transport_security.c \
src/core/tsi/ssl_transport_security.c \
+ src/core/tsi/transport_security_grpc.c \
src/core/tsi/transport_security.c \
src/core/tsi/transport_security_adapter.c \
src/core/ext/transport/chttp2/client/chttp2_connector.c \
@@ -19721,6 +19723,7 @@ src/core/tsi/gts_transport_security.c: $(OPENSSL_DEP)
src/core/tsi/ssl_transport_security.c: $(OPENSSL_DEP)
src/core/tsi/transport_security.c: $(OPENSSL_DEP)
src/core/tsi/transport_security_adapter.c: $(OPENSSL_DEP)
+src/core/tsi/transport_security_grpc.c: $(OPENSSL_DEP)
src/cpp/client/cronet_credentials.cc: $(OPENSSL_DEP)
src/cpp/client/secure_credentials.cc: $(OPENSSL_DEP)
src/cpp/common/auth_property_iterator.cc: $(OPENSSL_DEP)
diff --git a/binding.gyp b/binding.gyp
index 2ca36e8f69..bbefd05c20 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -841,6 +841,7 @@
'src/core/tsi/fake_transport_security.c',
'src/core/tsi/gts_transport_security.c',
'src/core/tsi/ssl_transport_security.c',
+ 'src/core/tsi/transport_security_grpc.c',
'src/core/tsi/transport_security.c',
'src/core/tsi/transport_security_adapter.c',
'src/core/ext/transport/chttp2/server/chttp2_server.c',
diff --git a/build.yaml b/build.yaml
index f57119a87c..a459b9d688 100644
--- a/build.yaml
+++ b/build.yaml
@@ -925,22 +925,33 @@ filegroups:
- src/core/tsi/gts_transport_security.h
- src/core/tsi/ssl_transport_security.h
- src/core/tsi/ssl_types.h
- - src/core/tsi/transport_security.h
- - src/core/tsi/transport_security_adapter.h
- - src/core/tsi/transport_security_interface.h
+ - src/core/tsi/transport_security_grpc.h
src:
- src/core/tsi/fake_transport_security.c
- src/core/tsi/gts_transport_security.c
- src/core/tsi/ssl_transport_security.c
+ - src/core/tsi/transport_security_grpc.c
+ deps:
+ - gpr
+ plugin: grpc_tsi_gts
+ secure: true
+ uses:
+ - tsi_interface
+ - grpc_base
+ - grpc_trace
+- name: tsi_interface
+ headers:
+ - src/core/tsi/transport_security.h
+ - src/core/tsi/transport_security_adapter.h
+ - src/core/tsi/transport_security_interface.h
+ src:
- src/core/tsi/transport_security.c
- src/core/tsi/transport_security_adapter.c
deps:
- gpr
- plugin: grpc_tsi_gts
secure: true
uses:
- grpc_trace
- - grpc_base
- name: grpc++_codegen_base
language: c++
public_headers:
diff --git a/config.m4 b/config.m4
index 1867133139..f6f8531b2f 100644
--- a/config.m4
+++ b/config.m4
@@ -270,6 +270,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/tsi/fake_transport_security.c \
src/core/tsi/gts_transport_security.c \
src/core/tsi/ssl_transport_security.c \
+ src/core/tsi/transport_security_grpc.c \
src/core/tsi/transport_security.c \
src/core/tsi/transport_security_adapter.c \
src/core/ext/transport/chttp2/server/chttp2_server.c \
diff --git a/config.w32 b/config.w32
index 03e83b09e8..1d1a0a4b63 100644
--- a/config.w32
+++ b/config.w32
@@ -247,6 +247,7 @@ if (PHP_GRPC != "no") {
"src\\core\\tsi\\fake_transport_security.c " +
"src\\core\\tsi\\gts_transport_security.c " +
"src\\core\\tsi\\ssl_transport_security.c " +
+ "src\\core\\tsi\\transport_security_grpc.c " +
"src\\core\\tsi\\transport_security.c " +
"src\\core\\tsi\\transport_security_adapter.c " +
"src\\core\\ext\\transport\\chttp2\\server\\chttp2_server.c " +
diff --git a/doc/load-balancing.md b/doc/load-balancing.md
index f56d2b0c73..88ff35496f 100644
--- a/doc/load-balancing.md
+++ b/doc/load-balancing.md
@@ -113,8 +113,8 @@ works:
that indicates which client-side load-balancing policy to use (e.g.,
`round_robin` or `grpclb`).
2. The client instantiates the load balancing policy.
- - Note: If all addresses returned by the resolver are balancer
- addresses, then the client will use the `grpclb` policy, regardless
+ - Note: If any one of the addresses returned by the resolver is a balancer
+ address, then the client will use the `grpclb` policy, regardless
of what load-balancing policy was requested by the service config.
Otherwise, the client will use the load-balancing policy requested
by the service config. If no load-balancing policy is requested
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index dfd991b1c6..4b1a8f38a3 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -290,6 +290,7 @@ Pod::Spec.new do |s|
'src/core/tsi/gts_transport_security.h',
'src/core/tsi/ssl_transport_security.h',
'src/core/tsi/ssl_types.h',
+ 'src/core/tsi/transport_security_grpc.h',
'src/core/tsi/transport_security.h',
'src/core/tsi/transport_security_adapter.h',
'src/core/tsi/transport_security_interface.h',
@@ -648,6 +649,7 @@ Pod::Spec.new do |s|
'src/core/tsi/fake_transport_security.c',
'src/core/tsi/gts_transport_security.c',
'src/core/tsi/ssl_transport_security.c',
+ 'src/core/tsi/transport_security_grpc.c',
'src/core/tsi/transport_security.c',
'src/core/tsi/transport_security_adapter.c',
'src/core/ext/transport/chttp2/server/chttp2_server.c',
@@ -782,6 +784,7 @@ Pod::Spec.new do |s|
'src/core/tsi/gts_transport_security.h',
'src/core/tsi/ssl_transport_security.h',
'src/core/tsi/ssl_types.h',
+ 'src/core/tsi/transport_security_grpc.h',
'src/core/tsi/transport_security.h',
'src/core/tsi/transport_security_adapter.h',
'src/core/tsi/transport_security_interface.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 07ca5de73c..f04a14167b 100755..100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -222,6 +222,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/tsi/gts_transport_security.h )
s.files += %w( src/core/tsi/ssl_transport_security.h )
s.files += %w( src/core/tsi/ssl_types.h )
+ s.files += %w( src/core/tsi/transport_security_grpc.h )
s.files += %w( src/core/tsi/transport_security.h )
s.files += %w( src/core/tsi/transport_security_adapter.h )
s.files += %w( src/core/tsi/transport_security_interface.h )
@@ -580,6 +581,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/tsi/fake_transport_security.c )
s.files += %w( src/core/tsi/gts_transport_security.c )
s.files += %w( src/core/tsi/ssl_transport_security.c )
+ s.files += %w( src/core/tsi/transport_security_grpc.c )
s.files += %w( src/core/tsi/transport_security.c )
s.files += %w( src/core/tsi/transport_security_adapter.c )
s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.c )
diff --git a/include/grpc/impl/codegen/byte_buffer_reader.h b/include/grpc/impl/codegen/byte_buffer_reader.h
index 2ae3f1e20e..dc0f15496f 100644
--- a/include/grpc/impl/codegen/byte_buffer_reader.h
+++ b/include/grpc/impl/codegen/byte_buffer_reader.h
@@ -29,7 +29,7 @@ struct grpc_byte_buffer_reader {
struct grpc_byte_buffer *buffer_in;
struct grpc_byte_buffer *buffer_out;
/** Different current objects correspond to different types of byte buffers */
- union {
+ union grpc_byte_buffer_reader_current {
/** Index into a slice buffer's array of slices */
unsigned index;
} current;
diff --git a/include/grpc/impl/codegen/compression_types.h b/include/grpc/impl/codegen/compression_types.h
index e39c13e88d..f1b2de3f7d 100644
--- a/include/grpc/impl/codegen/compression_types.h
+++ b/include/grpc/impl/codegen/compression_types.h
@@ -84,7 +84,7 @@ typedef struct grpc_compression_options {
* behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL. If present, takes
* precedence over \a default_algorithm.
* TODO(dgq): currently only available for server channels. */
- struct {
+ struct grpc_compression_options_default_level {
int is_set;
grpc_compression_level level;
} default_level;
@@ -92,7 +92,7 @@ typedef struct grpc_compression_options {
/** The default channel compression algorithm. It'll be used in the absence of
* call specific settings. This option corresponds to the channel argument key
* behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM. */
- struct {
+ struct grpc_compression_options_default_algorithm {
int is_set;
grpc_compression_algorithm algorithm;
} default_algorithm;
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 2b2036b24f..8813ec8f35 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -41,11 +41,11 @@ typedef enum {
typedef struct grpc_byte_buffer {
void *reserved;
grpc_byte_buffer_type type;
- union {
- struct {
+ union grpc_byte_buffer_data {
+ struct /* internal */ {
void *reserved[8];
} reserved;
- struct {
+ struct grpc_compressed_buffer {
grpc_compression_algorithm compression;
grpc_slice_buffer slice_buffer;
} raw;
@@ -104,10 +104,10 @@ typedef struct grpc_arg_pointer_vtable {
typedef struct {
grpc_arg_type type;
char *key;
- union {
+ union grpc_arg_value {
char *string;
int integer;
- struct {
+ struct grpc_arg_pointer {
void *p;
const grpc_arg_pointer_vtable *vtable;
} pointer;
@@ -391,7 +391,7 @@ typedef struct grpc_metadata {
/** The following fields are reserved for grpc internal use.
There is no need to initialize them, and they will be set to garbage
during calls to grpc. */
- struct {
+ struct /* internal */ {
void *obfuscated[4];
} internal_data;
} grpc_metadata;
@@ -491,25 +491,25 @@ typedef struct grpc_op {
uint32_t flags;
/** Reserved for future usage */
void *reserved;
- union {
+ union grpc_op_data {
/** Reserved for future usage */
- struct {
+ struct /* internal */ {
void *reserved[8];
} reserved;
- struct {
+ struct grpc_op_send_initial_metadata {
size_t count;
grpc_metadata *metadata;
/** If \a is_set, \a compression_level will be used for the call.
* Otherwise, \a compression_level won't be considered */
- struct {
+ struct grpc_op_send_initial_metadata_maybe_compression_level {
uint8_t is_set;
grpc_compression_level level;
} maybe_compression_level;
} send_initial_metadata;
- struct {
+ struct grpc_op_send_message {
struct grpc_byte_buffer *send_message;
} send_message;
- struct {
+ struct grpc_op_send_status_from_server {
size_t trailing_metadata_count;
grpc_metadata *trailing_metadata;
grpc_status_code status;
@@ -523,16 +523,16 @@ typedef struct grpc_op {
object, recv_initial_metadata->array is owned by the caller).
After the operation completes, call grpc_metadata_array_destroy on this
value, or reuse it in a future op. */
- struct {
+ struct grpc_op_recv_initial_metadata {
grpc_metadata_array *recv_initial_metadata;
} recv_initial_metadata;
/** ownership of the byte buffer is moved to the caller; the caller must
call grpc_byte_buffer_destroy on this value, or reuse it in a future op.
*/
- struct {
+ struct grpc_op_recv_message {
struct grpc_byte_buffer **recv_message;
} recv_message;
- struct {
+ struct grpc_op_recv_status_on_client {
/** ownership of the array is with the caller, but ownership of the
elements stays with the call object (ie key, value members are owned
by the call object, trailing_metadata->array is owned by the caller).
@@ -542,7 +542,7 @@ typedef struct grpc_op {
grpc_status_code *status;
grpc_slice *status_details;
} recv_status_on_client;
- struct {
+ struct grpc_op_recv_close_on_server {
/** out argument, set to 1 if the call failed in any way (seen as a
cancellation on the server), or 0 if the call succeeded */
int *cancelled;
diff --git a/include/grpc/impl/codegen/slice.h b/include/grpc/impl/codegen/slice.h
index 5ec439eb37..a04c683a55 100644
--- a/include/grpc/impl/codegen/slice.h
+++ b/include/grpc/impl/codegen/slice.h
@@ -75,12 +75,12 @@ typedef struct grpc_slice_refcount {
of data that is copied by value. */
struct grpc_slice {
struct grpc_slice_refcount *refcount;
- union {
- struct {
+ union grpc_slice_data {
+ struct grpc_slice_refcounted {
uint8_t *bytes;
size_t length;
} refcounted;
- struct {
+ struct grpc_slice_inlined {
uint8_t length;
uint8_t bytes[GRPC_SLICE_INLINED_SIZE];
} inlined;
diff --git a/package.xml b/package.xml
index 9d3fb1b5de..4e288b3c68 100644
--- a/package.xml
+++ b/package.xml
@@ -236,6 +236,7 @@
<file baseinstalldir="/" name="src/core/tsi/gts_transport_security.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/ssl_transport_security.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/ssl_types.h" role="src" />
+ <file baseinstalldir="/" name="src/core/tsi/transport_security_grpc.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security_interface.h" role="src" />
@@ -594,6 +595,7 @@
<file baseinstalldir="/" name="src/core/tsi/fake_transport_security.c" role="src" />
<file baseinstalldir="/" name="src/core/tsi/gts_transport_security.c" role="src" />
<file baseinstalldir="/" name="src/core/tsi/ssl_transport_security.c" role="src" />
+ <file baseinstalldir="/" name="src/core/tsi/transport_security_grpc.c" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security.c" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.c" role="src" />
diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c
index 543a73c5ad..823c681abf 100644
--- a/src/core/ext/census/tracing.c
+++ b/src/core/ext/census/tracing.c
@@ -21,7 +21,6 @@
#include <grpc/census.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <openssl/rand.h>
#include "src/core/ext/census/mlog.h"
void trace_start_span(const trace_span_context *span_ctxt,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 365aa583bb..9472a8e520 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -42,6 +42,7 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/support/murmur_hash.h"
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
@@ -239,22 +240,43 @@ struct grpc_pollset_set {
* condition variable polling definitions
*/
+#define POLLCV_THREAD_GRACE_MS 1000
#define CV_POLL_PERIOD_MS 1000
#define CV_DEFAULT_TABLE_SIZE 16
-typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t;
-
-typedef struct poll_args {
+typedef struct poll_result {
gpr_refcount refcount;
- gpr_cv *cv;
+ cv_node *watchers;
+ int watchcount;
struct pollfd *fds;
nfds_t nfds;
- int timeout;
int retval;
int err;
- gpr_atm status;
+ int completed;
+} poll_result;
+
+typedef struct poll_args {
+ gpr_cv trigger;
+ int trigger_set;
+ struct pollfd *fds;
+ nfds_t nfds;
+ poll_result *result;
+ struct poll_args *next;
+ struct poll_args *prev;
} poll_args;
+// This is a 2-tiered cache, we mantain a hash table
+// of active poll calls, so we can wait on the result
+// of that call. We also maintain a freelist of inactive
+// poll threads.
+typedef struct poll_hash_table {
+ poll_args *free_pollers;
+ poll_args **active_pollers;
+ unsigned int size;
+ unsigned int count;
+} poll_hash_table;
+
+poll_hash_table poll_cache;
cv_fd_table g_cvfds;
/*******************************************************************************
@@ -1277,43 +1299,205 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
* Condition Variable polling extensions
*/
-static void decref_poll_args(poll_args *args) {
- if (gpr_unref(&args->refcount)) {
- gpr_free(args->fds);
- gpr_cv_destroy(args->cv);
- gpr_free(args->cv);
- gpr_free(args);
+static void run_poll(void *args);
+static void cache_poller_locked(poll_args *args);
+
+static void cache_insert_locked(poll_args *args) {
+ uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
+ 0xDEADBEEF);
+ key = key % poll_cache.size;
+ if (poll_cache.active_pollers[key]) {
+ poll_cache.active_pollers[key]->prev = args;
}
+ args->next = poll_cache.active_pollers[key];
+ args->prev = NULL;
+ poll_cache.active_pollers[key] = args;
+ poll_cache.count++;
}
-// Poll in a background thread
-static void run_poll(void *arg) {
- int timeout, retval;
- poll_args *pargs = (poll_args *)arg;
- while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
- if (pargs->timeout < 0) {
- timeout = CV_POLL_PERIOD_MS;
- } else {
- timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout);
- pargs->timeout -= timeout;
+static void init_result(poll_args *pargs) {
+ pargs->result = gpr_malloc(sizeof(poll_result));
+ gpr_ref_init(&pargs->result->refcount, 1);
+ pargs->result->watchers = NULL;
+ pargs->result->watchcount = 0;
+ pargs->result->fds = gpr_malloc(sizeof(struct pollfd) * pargs->nfds);
+ memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds);
+ pargs->result->nfds = pargs->nfds;
+ pargs->result->retval = 0;
+ pargs->result->err = 0;
+ pargs->result->completed = 0;
+}
+
+// Creates a poll_args object for a given arguments to poll().
+// This object may return a poll_args in the cache.
+static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) {
+ uint32_t key =
+ gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF);
+ key = key % poll_cache.size;
+ poll_args *curr = poll_cache.active_pollers[key];
+ while (curr) {
+ if (curr->nfds == count &&
+ memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) {
+ gpr_free(fds);
+ return curr;
}
- retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
- if (retval != 0 || pargs->timeout == 0) {
- pargs->retval = retval;
- pargs->err = errno;
- break;
+ curr = curr->next;
+ }
+
+ if (poll_cache.free_pollers) {
+ poll_args *pargs = poll_cache.free_pollers;
+ poll_cache.free_pollers = pargs->next;
+ if (poll_cache.free_pollers) {
+ poll_cache.free_pollers->prev = NULL;
}
+ pargs->fds = fds;
+ pargs->nfds = count;
+ pargs->next = NULL;
+ pargs->prev = NULL;
+ init_result(pargs);
+ cache_poller_locked(pargs);
+ return pargs;
+ }
+
+ poll_args *pargs = gpr_malloc(sizeof(struct poll_args));
+ gpr_cv_init(&pargs->trigger);
+ pargs->fds = fds;
+ pargs->nfds = count;
+ pargs->next = NULL;
+ pargs->prev = NULL;
+ pargs->trigger_set = 0;
+ init_result(pargs);
+ cache_poller_locked(pargs);
+ gpr_thd_id t_id;
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_ref(&g_cvfds.pollcount);
+ gpr_thd_options_set_detached(&opt);
+ GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
+ return pargs;
+}
+
+static void cache_delete_locked(poll_args *args) {
+ if (!args->prev) {
+ uint32_t key = gpr_murmur_hash3(
+ args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF);
+ key = key % poll_cache.size;
+ GPR_ASSERT(poll_cache.active_pollers[key] == args);
+ poll_cache.active_pollers[key] = args->next;
+ } else {
+ args->prev->next = args->next;
}
- gpr_mu_lock(&g_cvfds.mu);
- if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
- // Signal main thread that the poll completed
- gpr_atm_no_barrier_store(&pargs->status, COMPLETED);
- gpr_cv_signal(pargs->cv);
+
+ if (args->next) {
+ args->next->prev = args->prev;
}
- decref_poll_args(pargs);
- g_cvfds.pollcount--;
- if (g_cvfds.shutdown && g_cvfds.pollcount == 0) {
- gpr_cv_signal(&g_cvfds.shutdown_complete);
+
+ poll_cache.count--;
+ if (poll_cache.free_pollers) {
+ poll_cache.free_pollers->prev = args;
+ }
+ args->prev = NULL;
+ args->next = poll_cache.free_pollers;
+ gpr_free(args->fds);
+ poll_cache.free_pollers = args;
+}
+
+static void cache_poller_locked(poll_args *args) {
+ if (poll_cache.count + 1 > poll_cache.size / 2) {
+ poll_args **old_active_pollers = poll_cache.active_pollers;
+ poll_cache.size = poll_cache.size * 2;
+ poll_cache.count = 0;
+ poll_cache.active_pollers = gpr_malloc(sizeof(void *) * poll_cache.size);
+ for (unsigned int i = 0; i < poll_cache.size; i++) {
+ poll_cache.active_pollers[i] = NULL;
+ }
+ for (unsigned int i = 0; i < poll_cache.size / 2; i++) {
+ poll_args *curr = old_active_pollers[i];
+ poll_args *next = NULL;
+ while (curr) {
+ next = curr->next;
+ cache_insert_locked(curr);
+ curr = next;
+ }
+ }
+ gpr_free(old_active_pollers);
+ }
+
+ cache_insert_locked(args);
+}
+
+static void cache_destroy_locked(poll_args *args) {
+ if (args->next) {
+ args->next->prev = args->prev;
+ }
+
+ if (args->prev) {
+ args->prev->next = args->next;
+ } else {
+ poll_cache.free_pollers = args->next;
+ }
+
+ gpr_free(args);
+}
+
+static void decref_poll_result(poll_result *res) {
+ if (gpr_unref(&res->refcount)) {
+ GPR_ASSERT(!res->watchers);
+ gpr_free(res->fds);
+ gpr_free(res);
+ }
+}
+
+void remove_cvn(cv_node **head, cv_node *target) {
+ if (target->next) {
+ target->next->prev = target->prev;
+ }
+
+ if (target->prev) {
+ target->prev->next = target->next;
+ } else {
+ *head = target->next;
+ }
+}
+
+gpr_timespec thread_grace;
+
+// Poll in a background thread
+static void run_poll(void *args) {
+ poll_args *pargs = (poll_args *)args;
+ while (1) {
+ poll_result *result = pargs->result;
+ int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
+ gpr_mu_lock(&g_cvfds.mu);
+ if (retval != 0) {
+ result->completed = 1;
+ result->retval = retval;
+ result->err = errno;
+ cv_node *watcher = result->watchers;
+ while (watcher) {
+ gpr_cv_signal(watcher->cv);
+ watcher = watcher->next;
+ }
+ }
+ if (result->watchcount == 0 || result->completed) {
+ cache_delete_locked(pargs);
+ decref_poll_result(result);
+ // Leave this polling thread alive for a grace period to do another poll()
+ // op
+ gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
+ deadline = gpr_time_add(deadline, thread_grace);
+ pargs->trigger_set = 0;
+ gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
+ if (!pargs->trigger_set) {
+ cache_destroy_locked(pargs);
+ break;
+ }
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+ }
+
+ // We still have the lock here
+ if (gpr_unref(&g_cvfds.pollcount)) {
+ gpr_cv_signal(&g_cvfds.shutdown_cv);
}
gpr_mu_unlock(&g_cvfds.mu);
}
@@ -1322,24 +1506,29 @@ static void run_poll(void *arg) {
static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
unsigned int i;
int res, idx;
- gpr_cv *pollcv;
- cv_node *cvn, *prev;
+ cv_node *pollcv;
int skip_poll = 0;
nfds_t nsockfds = 0;
- gpr_thd_id t_id;
- gpr_thd_options opt;
- poll_args *pargs = NULL;
+ poll_result *result = NULL;
gpr_mu_lock(&g_cvfds.mu);
- pollcv = gpr_malloc(sizeof(gpr_cv));
- gpr_cv_init(pollcv);
+ pollcv = gpr_malloc(sizeof(cv_node));
+ pollcv->next = NULL;
+ gpr_cv pollcv_cv;
+ gpr_cv_init(&pollcv_cv);
+ pollcv->cv = &pollcv_cv;
+ cv_node *fd_cvs = gpr_malloc(nfds * sizeof(cv_node));
+
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
idx = FD_TO_IDX(fds[i].fd);
- cvn = gpr_malloc(sizeof(cv_node));
- cvn->cv = pollcv;
- cvn->next = g_cvfds.cvfds[idx].cvs;
- g_cvfds.cvfds[idx].cvs = cvn;
+ fd_cvs[i].cv = &pollcv_cv;
+ fd_cvs[i].prev = NULL;
+ fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
+ if (g_cvfds.cvfds[idx].cvs) {
+ g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]);
+ }
+ g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]);
// Don't bother polling if a wakeup fd is ready
if (g_cvfds.cvfds[idx].is_set) {
skip_poll = 1;
@@ -1349,81 +1538,68 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
}
}
+ gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
+ if (timeout < 0) {
+ deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ } else {
+ deadline =
+ gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
+ }
+
res = 0;
if (!skip_poll && nsockfds > 0) {
- pargs = gpr_malloc(sizeof(struct poll_args));
- // Both the main thread and calling thread get a reference
- gpr_ref_init(&pargs->refcount, 2);
- pargs->cv = pollcv;
- pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
- pargs->nfds = nsockfds;
- pargs->timeout = timeout;
- pargs->retval = 0;
- pargs->err = 0;
- gpr_atm_no_barrier_store(&pargs->status, INPROGRESS);
+ struct pollfd *pollfds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd >= 0) {
- pargs->fds[idx].fd = fds[i].fd;
- pargs->fds[idx].events = fds[i].events;
- pargs->fds[idx].revents = 0;
+ pollfds[idx].fd = fds[i].fd;
+ pollfds[idx].events = fds[i].events;
+ pollfds[idx].revents = 0;
idx++;
}
}
- g_cvfds.pollcount++;
- opt = gpr_thd_options_default();
- gpr_thd_options_set_detached(&opt);
- GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
- // We want the poll() thread to trigger the deadline, so wait forever here
- gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
- if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
- res = pargs->retval;
- errno = pargs->err;
- } else {
- errno = 0;
- gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
+ poll_args *pargs = get_poller_locked(pollfds, nsockfds);
+ result = pargs->result;
+ pollcv->next = result->watchers;
+ pollcv->prev = NULL;
+ if (result->watchers) {
+ result->watchers->prev = pollcv;
}
+ result->watchers = pollcv;
+ result->watchcount++;
+ gpr_ref(&result->refcount);
+
+ pargs->trigger_set = 1;
+ gpr_cv_signal(&pargs->trigger);
+ gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
+ res = result->retval;
+ errno = result->err;
+ result->watchcount--;
+ remove_cvn(&result->watchers, pollcv);
} else if (!skip_poll) {
- gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
- deadline =
- gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
- gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
+ gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
}
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
- cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
- prev = NULL;
- while (cvn->cv != pollcv) {
- prev = cvn;
- cvn = cvn->next;
- GPR_ASSERT(cvn);
- }
- if (!prev) {
- g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
- } else {
- prev->next = cvn->next;
- }
- gpr_free(cvn);
-
+ remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
- } else if (!skip_poll && fds[i].fd >= 0 &&
- gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
- fds[i].revents = pargs->fds[idx].revents;
+ } else if (!skip_poll && fds[i].fd >= 0 && result->completed) {
+ fds[i].revents = result->fds[idx].revents;
idx++;
}
}
- if (pargs) {
- decref_poll_args(pargs);
- } else {
- gpr_cv_destroy(pollcv);
- gpr_free(pollcv);
+ gpr_free(fd_cvs);
+ gpr_free(pollcv);
+ if (result) {
+ decref_poll_result(result);
}
+
gpr_mu_unlock(&g_cvfds.mu);
return res;
@@ -1432,12 +1608,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
static void global_cv_fd_table_init() {
gpr_mu_init(&g_cvfds.mu);
gpr_mu_lock(&g_cvfds.mu);
- gpr_cv_init(&g_cvfds.shutdown_complete);
- g_cvfds.shutdown = 0;
- g_cvfds.pollcount = 0;
+ gpr_cv_init(&g_cvfds.shutdown_cv);
+ gpr_ref_init(&g_cvfds.pollcount, 1);
g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
g_cvfds.free_fds = NULL;
+ thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = NULL;
@@ -1447,23 +1623,35 @@ static void global_cv_fd_table_init() {
// Override the poll function with one that supports cvfds
g_cvfds.poll = grpc_poll_function;
grpc_poll_function = &cvfd_poll;
+
+ // Initialize the cache
+ poll_cache.size = 32;
+ poll_cache.count = 0;
+ poll_cache.free_pollers = NULL;
+ poll_cache.active_pollers = gpr_malloc(sizeof(void *) * 32);
+ for (unsigned int i = 0; i < poll_cache.size; i++) {
+ poll_cache.active_pollers[i] = NULL;
+ }
+
gpr_mu_unlock(&g_cvfds.mu);
}
static void global_cv_fd_table_shutdown() {
gpr_mu_lock(&g_cvfds.mu);
- g_cvfds.shutdown = 1;
// Attempt to wait for all abandoned poll() threads to terminate
// Not doing so will result in reported memory leaks
- if (g_cvfds.pollcount > 0) {
- int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu,
+ if (!gpr_unref(&g_cvfds.pollcount)) {
+ int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(3, GPR_TIMESPAN)));
GPR_ASSERT(res == 0);
}
- gpr_cv_destroy(&g_cvfds.shutdown_complete);
+ gpr_cv_destroy(&g_cvfds.shutdown_cv);
grpc_poll_function = g_cvfds.poll;
gpr_free(g_cvfds.cvfds);
+
+ gpr_free(poll_cache.active_pollers);
+
gpr_mu_unlock(&g_cvfds.mu);
gpr_mu_destroy(&g_cvfds.mu);
}
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 833170ceed..41c69add17 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -51,33 +51,6 @@ bool grpc_exec_ctx_has_work(grpc_exec_ctx *exec_ctx) {
!grpc_closure_list_empty(exec_ctx->closure_list);
}
-bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
- bool did_something = 0;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
- for (;;) {
- if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
- grpc_closure *c = exec_ctx->closure_list.head;
- exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
- did_something = true;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- }
- } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
- break;
- }
- }
- GPR_ASSERT(exec_ctx->active_combiner == NULL);
- GPR_TIMER_END("grpc_exec_ctx_flush", 0);
- return did_something;
-}
-
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
grpc_exec_ctx_flush(exec_ctx);
@@ -103,6 +76,29 @@ static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
GRPC_ERROR_UNREF(error);
}
+bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
+ bool did_something = 0;
+ GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
+ for (;;) {
+ if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
+ grpc_closure *c = exec_ctx->closure_list.head;
+ exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ did_something = true;
+ exec_ctx_run(exec_ctx, c, error);
+ c = next;
+ }
+ } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
+ break;
+ }
+ }
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ GPR_TIMER_END("grpc_exec_ctx_flush", 0);
+ return did_something;
+}
+
static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index c5dcdc9746..46e84f5843 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -43,6 +43,7 @@
typedef struct cv_node {
gpr_cv* cv;
struct cv_node* next;
+ struct cv_node* prev;
} cv_node;
typedef struct fd_node {
@@ -53,9 +54,8 @@ typedef struct fd_node {
typedef struct cv_fd_table {
gpr_mu mu;
- int pollcount;
- int shutdown;
- gpr_cv shutdown_complete;
+ gpr_refcount pollcount;
+ gpr_cv shutdown_cv;
fd_node* cvfds;
fd_node* free_fds;
unsigned int size;
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index b9da6e16b2..fc9c9f980f 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -261,7 +261,7 @@ static grpc_error *do_handshaker_next_locked(
grpc_exec_ctx *exec_ctx, security_handshaker *h,
const unsigned char *bytes_received, size_t bytes_received_size) {
// Invoke TSI handshaker.
- unsigned char *bytes_to_send = NULL;
+ const unsigned char *bytes_to_send = NULL;
size_t bytes_to_send_size = 0;
tsi_handshaker_result *handshaker_result = NULL;
tsi_result result = tsi_handshaker_next(
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3d82a32e82..c20cfbc740 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -235,7 +235,8 @@ typedef struct cq_next_data {
/* Number of outstanding events (+1 if not shut down) */
gpr_atm pending_events;
- int shutdown_called;
+ /** 0 initially. 1 once we initiated shutdown */
+ bool shutdown_called;
} cq_next_data;
typedef struct cq_pluck_data {
@@ -244,15 +245,20 @@ typedef struct cq_pluck_data {
grpc_cq_completion *completed_tail;
/** Number of pending events (+1 if we're not shutdown) */
- gpr_refcount pending_events;
+ gpr_atm pending_events;
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
- /** 0 initially, 1 once we've begun shutting down */
+ /** 0 initially. 1 once we completed shutting */
+ /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
+ * (pending_events == 0). So consider removing this in future and use
+ * pending_events */
gpr_atm shutdown;
- int shutdown_called;
+
+ /** 0 initially. 1 once we initiated shutdown */
+ bool shutdown_called;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
@@ -436,7 +442,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
static void cq_init_next(void *ptr) {
cq_next_data *cqd = ptr;
- /* Initial ref is dropped by grpc_completion_queue_shutdown */
+ /* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
@@ -451,12 +457,12 @@ static void cq_destroy_next(void *ptr) {
static void cq_init_pluck(void *ptr) {
cq_pluck_data *cqd = ptr;
- /* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cqd->pending_events, 1);
+ /* Initial count is dropped by grpc_completion_queue_shutdown */
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
gpr_atm_no_barrier_store(&cqd->shutdown, 0);
- cqd->shutdown_called = 0;
+ cqd->shutdown_called = false;
cqd->num_pluckers = 0;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
}
@@ -549,24 +555,32 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
-static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+/* Atomically increments a counter only if the counter is not zero. Returns
+ * true if the increment was successful; false if the counter is zero */
+static bool atm_inc_if_nonzero(gpr_atm *counter) {
while (true) {
- gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
+ gpr_atm count = gpr_atm_no_barrier_load(counter);
+ /* If zero, we are done. If not, we must to a CAS (instead of an atomic
+ * increment) to maintain the contract: do not increment the counter if it
+ * is zero. */
if (count == 0) {
return false;
- } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
+ } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) {
break;
}
}
+
return true;
}
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ return atm_inc_if_nonzero(&cqd->pending_events);
+}
+
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_ref(&cqd->pending_events);
- return true;
+ return atm_inc_if_nonzero(&cqd->pending_events);
}
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
@@ -704,8 +718,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next);
cqd->completed_tail = storage;
- int shutdown = gpr_unref(&cqd->pending_events);
- if (!shutdown) {
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_pluck(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ } else {
grpc_pollset_worker *pluck_worker = NULL;
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag) {
@@ -725,9 +741,6 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(kick_error);
}
- } else {
- cq_finish_shutdown_pluck(exec_ctx, cq);
- gpr_mu_unlock(cq->mu);
}
GPR_TIMER_END("cq_end_op_for_pluck", 0);
@@ -952,6 +965,12 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
+ /* Need an extra ref for cq here because:
+ * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
+ * Pollset shutdown decrements the cq ref count which can potentially destroy
+ * the cq (if that happens to be the last ref).
+ * Creating an extra ref here prevents the cq from getting destroyed while
+ * this function is still active */
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
@@ -960,7 +979,7 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
- cqd->shutdown_called = 1;
+ cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_next(exec_ctx, cq);
}
@@ -1172,21 +1191,32 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
&cq->pollset_shutdown_done);
}
+/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
+ * merging them is a bit tricky and probably not worth it */
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ /* Need an extra ref for cq here because:
+ * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
+ * Pollset shutdown decrements the cq ref count which can potentially destroy
+ * the cq (if that happens to be the last ref).
+ * Creating an extra ref here prevents the cq from getting destroyed while
+ * this function is still active */
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
- cqd->shutdown_called = 1;
- if (gpr_unref(&cqd->pending_events)) {
+ cqd->shutdown_called = true;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_pluck(exec_ctx, cq);
}
gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index 810447313c..967126ecee 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -407,8 +407,10 @@ static void fake_handshaker_result_destroy(tsi_handshaker_result *self) {
static const tsi_handshaker_result_vtable handshaker_result_vtable = {
fake_handshaker_result_extract_peer,
+ NULL, /* create_zero_copy_grpc_protector */
fake_handshaker_result_create_frame_protector,
- fake_handshaker_result_get_unused_bytes, fake_handshaker_result_destroy,
+ fake_handshaker_result_get_unused_bytes,
+ fake_handshaker_result_destroy,
};
static tsi_result fake_handshaker_result_create(
@@ -530,7 +532,7 @@ static void fake_handshaker_destroy(tsi_handshaker *self) {
static tsi_result fake_handshaker_next(
tsi_handshaker *self, const unsigned char *received_bytes,
- size_t received_bytes_size, unsigned char **bytes_to_send,
+ size_t received_bytes_size, const unsigned char **bytes_to_send,
size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
tsi_handshaker_on_next_done_cb cb, void *user_data) {
/* Sanity check the arguments. */
diff --git a/src/core/tsi/transport_security.c b/src/core/tsi/transport_security.c
index 2b1f4310c1..76213072a3 100644
--- a/src/core/tsi/transport_security.c
+++ b/src/core/tsi/transport_security.c
@@ -74,14 +74,12 @@ tsi_result tsi_frame_protector_protect(tsi_frame_protector *self,
size_t *unprotected_bytes_size,
unsigned char *protected_output_frames,
size_t *protected_output_frames_size) {
- if (self == NULL || unprotected_bytes == NULL ||
+ if (self == NULL || self->vtable == NULL || unprotected_bytes == NULL ||
unprotected_bytes_size == NULL || protected_output_frames == NULL ||
protected_output_frames_size == NULL) {
return TSI_INVALID_ARGUMENT;
}
- if (self->vtable == NULL || self->vtable->protect == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->protect == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->protect(self, unprotected_bytes, unprotected_bytes_size,
protected_output_frames,
protected_output_frames_size);
@@ -90,13 +88,11 @@ tsi_result tsi_frame_protector_protect(tsi_frame_protector *self,
tsi_result tsi_frame_protector_protect_flush(
tsi_frame_protector *self, unsigned char *protected_output_frames,
size_t *protected_output_frames_size, size_t *still_pending_size) {
- if (self == NULL || protected_output_frames == NULL ||
+ if (self == NULL || self->vtable == NULL || protected_output_frames == NULL ||
protected_output_frames_size == NULL || still_pending_size == NULL) {
return TSI_INVALID_ARGUMENT;
}
- if (self->vtable == NULL || self->vtable->protect_flush == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->protect_flush == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->protect_flush(self, protected_output_frames,
protected_output_frames_size,
still_pending_size);
@@ -106,14 +102,12 @@ tsi_result tsi_frame_protector_unprotect(
tsi_frame_protector *self, const unsigned char *protected_frames_bytes,
size_t *protected_frames_bytes_size, unsigned char *unprotected_bytes,
size_t *unprotected_bytes_size) {
- if (self == NULL || protected_frames_bytes == NULL ||
+ if (self == NULL || self->vtable == NULL || protected_frames_bytes == NULL ||
protected_frames_bytes_size == NULL || unprotected_bytes == NULL ||
unprotected_bytes_size == NULL) {
return TSI_INVALID_ARGUMENT;
}
- if (self->vtable == NULL || self->vtable->unprotect == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->unprotect == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->unprotect(self, protected_frames_bytes,
protected_frames_bytes_size, unprotected_bytes,
unprotected_bytes_size);
@@ -131,48 +125,44 @@ void tsi_frame_protector_destroy(tsi_frame_protector *self) {
tsi_result tsi_handshaker_get_bytes_to_send_to_peer(tsi_handshaker *self,
unsigned char *bytes,
size_t *bytes_size) {
- if (self == NULL || bytes == NULL || bytes_size == NULL) {
+ if (self == NULL || self->vtable == NULL || bytes == NULL ||
+ bytes_size == NULL) {
return TSI_INVALID_ARGUMENT;
}
if (self->frame_protector_created) return TSI_FAILED_PRECONDITION;
- if (self->vtable == NULL || self->vtable->get_bytes_to_send_to_peer == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->get_bytes_to_send_to_peer == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->get_bytes_to_send_to_peer(self, bytes, bytes_size);
}
tsi_result tsi_handshaker_process_bytes_from_peer(tsi_handshaker *self,
const unsigned char *bytes,
size_t *bytes_size) {
- if (self == NULL || bytes == NULL || bytes_size == NULL) {
+ if (self == NULL || self->vtable == NULL || bytes == NULL ||
+ bytes_size == NULL) {
return TSI_INVALID_ARGUMENT;
}
if (self->frame_protector_created) return TSI_FAILED_PRECONDITION;
- if (self->vtable == NULL || self->vtable->process_bytes_from_peer == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->process_bytes_from_peer == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->process_bytes_from_peer(self, bytes, bytes_size);
}
tsi_result tsi_handshaker_get_result(tsi_handshaker *self) {
- if (self == NULL) return TSI_INVALID_ARGUMENT;
+ if (self == NULL || self->vtable == NULL) return TSI_INVALID_ARGUMENT;
if (self->frame_protector_created) return TSI_FAILED_PRECONDITION;
- if (self->vtable == NULL || self->vtable->get_result == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->get_result == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->get_result(self);
}
tsi_result tsi_handshaker_extract_peer(tsi_handshaker *self, tsi_peer *peer) {
- if (self == NULL || peer == NULL) return TSI_INVALID_ARGUMENT;
+ if (self == NULL || self->vtable == NULL || peer == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
memset(peer, 0, sizeof(tsi_peer));
if (self->frame_protector_created) return TSI_FAILED_PRECONDITION;
if (tsi_handshaker_get_result(self) != TSI_OK) {
return TSI_FAILED_PRECONDITION;
}
- if (self->vtable == NULL || self->vtable->extract_peer == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->extract_peer == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->extract_peer(self, peer);
}
@@ -180,14 +170,12 @@ tsi_result tsi_handshaker_create_frame_protector(
tsi_handshaker *self, size_t *max_protected_frame_size,
tsi_frame_protector **protector) {
tsi_result result;
- if (self == NULL || protector == NULL) return TSI_INVALID_ARGUMENT;
- if (self->frame_protector_created) return TSI_FAILED_PRECONDITION;
- if (tsi_handshaker_get_result(self) != TSI_OK) {
- return TSI_FAILED_PRECONDITION;
- }
- if (self->vtable == NULL || self->vtable->create_frame_protector == NULL) {
- return TSI_UNIMPLEMENTED;
+ if (self == NULL || self->vtable == NULL || protector == NULL) {
+ return TSI_INVALID_ARGUMENT;
}
+ if (self->frame_protector_created) return TSI_FAILED_PRECONDITION;
+ if (tsi_handshaker_get_result(self) != TSI_OK) return TSI_FAILED_PRECONDITION;
+ if (self->vtable->create_frame_protector == NULL) return TSI_UNIMPLEMENTED;
result = self->vtable->create_frame_protector(self, max_protected_frame_size,
protector);
if (result == TSI_OK) {
@@ -198,14 +186,12 @@ tsi_result tsi_handshaker_create_frame_protector(
tsi_result tsi_handshaker_next(
tsi_handshaker *self, const unsigned char *received_bytes,
- size_t received_bytes_size, unsigned char **bytes_to_send,
+ size_t received_bytes_size, const unsigned char **bytes_to_send,
size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
tsi_handshaker_on_next_done_cb cb, void *user_data) {
- if (self == NULL) return TSI_INVALID_ARGUMENT;
+ if (self == NULL || self->vtable == NULL) return TSI_INVALID_ARGUMENT;
if (self->handshaker_result_created) return TSI_FAILED_PRECONDITION;
- if (self->vtable == NULL || self->vtable->next == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->next == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->next(self, received_bytes, received_bytes_size,
bytes_to_send, bytes_to_send_size,
handshaker_result, cb, user_data);
@@ -220,21 +206,21 @@ void tsi_handshaker_destroy(tsi_handshaker *self) {
tsi_result tsi_handshaker_result_extract_peer(const tsi_handshaker_result *self,
tsi_peer *peer) {
- if (self == NULL || peer == NULL) return TSI_INVALID_ARGUMENT;
- memset(peer, 0, sizeof(tsi_peer));
- if (self->vtable == NULL || self->vtable->extract_peer == NULL) {
- return TSI_UNIMPLEMENTED;
+ if (self == NULL || self->vtable == NULL || peer == NULL) {
+ return TSI_INVALID_ARGUMENT;
}
+ memset(peer, 0, sizeof(tsi_peer));
+ if (self->vtable->extract_peer == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->extract_peer(self, peer);
}
tsi_result tsi_handshaker_result_create_frame_protector(
const tsi_handshaker_result *self, size_t *max_protected_frame_size,
tsi_frame_protector **protector) {
- if (self == NULL || protector == NULL) return TSI_INVALID_ARGUMENT;
- if (self->vtable == NULL || self->vtable->create_frame_protector == NULL) {
- return TSI_UNIMPLEMENTED;
+ if (self == NULL || self->vtable == NULL || protector == NULL) {
+ return TSI_INVALID_ARGUMENT;
}
+ if (self->vtable->create_frame_protector == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->create_frame_protector(self, max_protected_frame_size,
protector);
}
@@ -242,12 +228,11 @@ tsi_result tsi_handshaker_result_create_frame_protector(
tsi_result tsi_handshaker_result_get_unused_bytes(
const tsi_handshaker_result *self, const unsigned char **bytes,
size_t *bytes_size) {
- if (self == NULL || bytes == NULL || bytes_size == NULL) {
+ if (self == NULL || self->vtable == NULL || bytes == NULL ||
+ bytes_size == NULL) {
return TSI_INVALID_ARGUMENT;
}
- if (self->vtable == NULL || self->vtable->get_unused_bytes == NULL) {
- return TSI_UNIMPLEMENTED;
- }
+ if (self->vtable->get_unused_bytes == NULL) return TSI_UNIMPLEMENTED;
return self->vtable->get_unused_bytes(self, bytes, bytes_size);
}
diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h
index 2c7db6bca9..b0d7039850 100644
--- a/src/core/tsi/transport_security.h
+++ b/src/core/tsi/transport_security.h
@@ -70,7 +70,8 @@ typedef struct {
tsi_frame_protector **protector);
void (*destroy)(tsi_handshaker *self);
tsi_result (*next)(tsi_handshaker *self, const unsigned char *received_bytes,
- size_t received_bytes_size, unsigned char **bytes_to_send,
+ size_t received_bytes_size,
+ const unsigned char **bytes_to_send,
size_t *bytes_to_send_size,
tsi_handshaker_result **handshaker_result,
tsi_handshaker_on_next_done_cb cb, void *user_data);
@@ -86,6 +87,10 @@ struct tsi_handshaker {
See transport_security_interface.h for documentation. */
typedef struct {
tsi_result (*extract_peer)(const tsi_handshaker_result *self, tsi_peer *peer);
+ tsi_result (*create_zero_copy_grpc_protector)(
+ const tsi_handshaker_result *self,
+ size_t *max_output_protected_frame_size,
+ tsi_zero_copy_grpc_protector **protector);
tsi_result (*create_frame_protector)(const tsi_handshaker_result *self,
size_t *max_output_protected_frame_size,
tsi_frame_protector **protector);
diff --git a/src/core/tsi/transport_security_adapter.c b/src/core/tsi/transport_security_adapter.c
index b6dc660c47..1c2a57b3bd 100644
--- a/src/core/tsi/transport_security_adapter.c
+++ b/src/core/tsi/transport_security_adapter.c
@@ -66,8 +66,11 @@ static void adapter_result_destroy(tsi_handshaker_result *self) {
}
static const tsi_handshaker_result_vtable result_vtable = {
- adapter_result_extract_peer, adapter_result_create_frame_protector,
- adapter_result_get_unused_bytes, adapter_result_destroy,
+ adapter_result_extract_peer,
+ NULL, /* create_zero_copy_grpc_protector */
+ adapter_result_create_frame_protector,
+ adapter_result_get_unused_bytes,
+ adapter_result_destroy,
};
/* Ownership of wrapped tsi_handshaker is transferred to the result object. */
@@ -140,7 +143,7 @@ static void adapter_destroy(tsi_handshaker *self) {
static tsi_result adapter_next(
tsi_handshaker *self, const unsigned char *received_bytes,
- size_t received_bytes_size, unsigned char **bytes_to_send,
+ size_t received_bytes_size, const unsigned char **bytes_to_send,
size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
tsi_handshaker_on_next_done_cb cb, void *user_data) {
/* Input sanity check. */
diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c
new file mode 100644
index 0000000000..5bcfdfa61f
--- /dev/null
+++ b/src/core/tsi/transport_security_grpc.c
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/tsi/transport_security_grpc.h"
+
+/* This method creates a tsi_zero_copy_grpc_protector object. */
+tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector(
+ const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ tsi_zero_copy_grpc_protector **protector) {
+ if (self == NULL || self->vtable == NULL || protector == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ if (self->vtable->create_zero_copy_grpc_protector == NULL) {
+ return TSI_UNIMPLEMENTED;
+ }
+ return self->vtable->create_zero_copy_grpc_protector(
+ self, max_output_protected_frame_size, protector);
+}
+
+/* --- tsi_zero_copy_grpc_protector common implementation. ---
+
+ Calls specific implementation after state/input validation. */
+
+tsi_result tsi_zero_copy_grpc_protector_protect(
+ tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices,
+ grpc_slice_buffer *protected_slices) {
+ if (self == NULL || self->vtable == NULL || unprotected_slices == NULL ||
+ protected_slices == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ if (self->vtable->protect == NULL) return TSI_UNIMPLEMENTED;
+ return self->vtable->protect(self, unprotected_slices, protected_slices);
+}
+
+tsi_result tsi_zero_copy_grpc_protector_unprotect(
+ tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices,
+ grpc_slice_buffer *unprotected_slices) {
+ if (self == NULL || self->vtable == NULL || protected_slices == NULL ||
+ unprotected_slices == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ if (self->vtable->unprotect == NULL) return TSI_UNIMPLEMENTED;
+ return self->vtable->unprotect(self, protected_slices, unprotected_slices);
+}
+
+void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self) {
+ if (self == NULL) return;
+ self->vtable->destroy(self);
+}
diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h
new file mode 100644
index 0000000000..5ab5297cc4
--- /dev/null
+++ b/src/core/tsi/transport_security_grpc.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_TSI_TRANSPORT_SECURITY_GRPC_H
+#define GRPC_CORE_TSI_TRANSPORT_SECURITY_GRPC_H
+
+#include <grpc/slice_buffer.h>
+#include "src/core/tsi/transport_security.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* This method creates a tsi_zero_copy_grpc_protector object. It return TSI_OK
+ assuming there is no fatal error.
+ The caller is responsible for destroying the protector. */
+tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector(
+ const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ tsi_zero_copy_grpc_protector **protector);
+
+/* -- tsi_zero_copy_grpc_protector object -- */
+
+/* Outputs protected frames.
+ - unprotected_slices is the unprotected data to be protected.
+ - protected_slices is the protected output frames. One or more frames
+ may be produced in this protect function.
+ - This method returns TSI_OK in case of success or a specific error code in
+ case of failure. */
+tsi_result tsi_zero_copy_grpc_protector_protect(
+ tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices,
+ grpc_slice_buffer *protected_slices);
+
+/* Outputs unprotected bytes.
+ - protected_slices is the bytes of protected frames.
+ - unprotected_slices is the unprotected output data.
+ - This method returns TSI_OK in case of success. Success includes cases where
+ there is not enough data to output in which case unprotected_slices has 0
+ bytes. */
+tsi_result tsi_zero_copy_grpc_protector_unprotect(
+ tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices,
+ grpc_slice_buffer *unprotected_slices);
+
+/* Destroys the tsi_zero_copy_grpc_protector object. */
+void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self);
+
+/* Base for tsi_zero_copy_grpc_protector implementations. */
+typedef struct {
+ tsi_result (*protect)(tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *unprotected_slices,
+ grpc_slice_buffer *protected_slices);
+ tsi_result (*unprotect)(tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *protected_slices,
+ grpc_slice_buffer *unprotected_slices);
+ void (*destroy)(tsi_zero_copy_grpc_protector *self);
+} tsi_zero_copy_grpc_protector_vtable;
+
+struct tsi_zero_copy_grpc_protector {
+ const tsi_zero_copy_grpc_protector_vtable *vtable;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* GRPC_CORE_TSI_TRANSPORT_SECURITY_GRPC_H */
diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h
index 39ba8addc4..80c426bbdb 100644
--- a/src/core/tsi/transport_security_interface.h
+++ b/src/core/tsi/transport_security_interface.h
@@ -62,6 +62,15 @@ const char *tsi_result_to_string(tsi_result result);
extern grpc_tracer_flag tsi_tracing_enabled;
+/* -- tsi_zero_copy_grpc_protector object --
+
+ This object protects and unprotects grpc slice buffers with zero or minimized
+ memory copy once the handshake is done. Implementations of this object must be
+ thread compatible. This object depends on grpc and the details of this object
+ is defined in transport_security_grpc.h. */
+
+typedef struct tsi_zero_copy_grpc_protector tsi_zero_copy_grpc_protector;
+
/* --- tsi_frame_protector object ---
This object protects and unprotects buffers once the handshake is done.
@@ -429,7 +438,7 @@ typedef void (*tsi_handshaker_on_next_done_cb)(
tsi_handshaker object. */
tsi_result tsi_handshaker_next(
tsi_handshaker *self, const unsigned char *received_bytes,
- size_t received_bytes_size, unsigned char **bytes_to_send,
+ size_t received_bytes_size, const unsigned char **bytes_to_send,
size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
tsi_handshaker_on_next_done_cb cb, void *user_data);
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 0f3a82c605..fc9d5599f2 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -18,6 +18,7 @@
using System;
using System.Linq;
+using System.Threading;
using Grpc.Core;
using NUnit.Framework;
@@ -75,5 +76,19 @@ namespace Grpc.Core.Tests
var parts = coreVersion.Split('.');
Assert.AreEqual(3, parts.Length);
}
+
+ [Test]
+ public void ShuttingDownEventIsFired()
+ {
+ var cts = new CancellationTokenSource();
+ var handler = new EventHandler((sender, args) => { cts.Cancel(); });
+
+ GrpcEnvironment.ShuttingDown += handler;
+ var env = GrpcEnvironment.AddRef();
+ GrpcEnvironment.ReleaseAsync().Wait();
+ GrpcEnvironment.ShuttingDown -= handler;
+
+ Assert.IsTrue(cts.Token.IsCancellationRequested);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 0663ee9215..cbc7d2078c 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -49,7 +49,7 @@ namespace Grpc.Core
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
- bool isClosed;
+ bool isShutdown;
/// <summary>
/// Returns a reference-counted instance of initialized gRPC environment.
@@ -238,6 +238,12 @@ namespace Grpc.Core
}
/// <summary>
+ /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic.
+ /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first).
+ /// </summary>
+ public static event EventHandler ShuttingDown;
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
@@ -311,13 +317,16 @@ namespace Grpc.Core
/// </summary>
private async Task ShutdownAsync()
{
- if (isClosed)
+ if (isShutdown)
{
- throw new InvalidOperationException("Close has already been called");
+ throw new InvalidOperationException("ShutdownAsync has already been called");
}
+
+ await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false);
+
await threadPool.StopAsync().ConfigureAwait(false);
GrpcNativeShutdown();
- isClosed = true;
+ isShutdown = true;
debugStats.CheckOK();
}
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 71e6904008..26095a78f9 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -260,7 +260,10 @@ class SendClientCloseOp : public Op {
class SendServerStatusOp : public Op {
public:
- SendServerStatusOp() { grpc_metadata_array_init(&status_metadata); }
+ SendServerStatusOp() {
+ details = grpc_empty_slice();
+ grpc_metadata_array_init(&status_metadata);
+ }
~SendServerStatusOp() {
grpc_slice_unref(details);
DestroyMetadataArray(&status_metadata);
@@ -381,7 +384,10 @@ class ReadMessageOp : public Op {
class ClientStatusOp : public Op {
public:
- ClientStatusOp() { grpc_metadata_array_init(&metadata_array); }
+ ClientStatusOp() {
+ grpc_metadata_array_init(&metadata_array);
+ status_details = grpc_empty_slice();
+ }
~ClientStatusOp() {
grpc_metadata_array_destroy(&metadata_array);
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index aebd298e32..b5246c4f31 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -188,6 +188,103 @@ describe('call', function() {
}, TypeError);
});
});
+ describe('startBatch with message', function() {
+ it('should fail with null argument', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_MESSAGE] = null;
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ it('should fail with numeric argument', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_MESSAGE] = 5;
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ it('should fail with string argument', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_MESSAGE] = 'value';
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ });
+ describe('startBatch with status', function() {
+ it('should fail without a code', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ details: 'details string',
+ metadata: {}
+ };
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ it('should fail without details', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: 0,
+ metadata: {}
+ };
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ it('should fail without metadata', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: 0,
+ details: 'details string'
+ };
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ it('should fail with incorrectly typed code argument', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: 'code string',
+ details: 'details string',
+ metadata: {}
+ };
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ it('should fail with incorrectly typed details argument', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: 0,
+ details: 5,
+ metadata: {}
+ };
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ it('should fail with incorrectly typed metadata argument', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: 0,
+ details: 'details string',
+ metadata: 'abc'
+ };
+ call.startBatch(batch, function(){});
+ }, TypeError);
+ });
+ });
describe('cancel', function() {
it('should succeed', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index e52d43e81d..dc4d28f95b 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -246,6 +246,7 @@ CORE_SOURCE_FILES = [
'src/core/tsi/fake_transport_security.c',
'src/core/tsi/gts_transport_security.c',
'src/core/tsi/ssl_transport_security.c',
+ 'src/core/tsi/transport_security_grpc.c',
'src/core/tsi/transport_security.c',
'src/core/tsi/transport_security_adapter.c',
'src/core/ext/transport/chttp2/server/chttp2_server.c',
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index fd1a13563a..4901fc9218 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1421,6 +1421,8 @@ src/core/tsi/transport_security.c \
src/core/tsi/transport_security.h \
src/core/tsi/transport_security_adapter.c \
src/core/tsi/transport_security_adapter.h \
+src/core/tsi/transport_security_grpc.c \
+src/core/tsi/transport_security_grpc.h \
src/core/tsi/transport_security_interface.h \
third_party/nanopb/pb_common.c \
third_party/nanopb/pb_decode.c \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 41a299dd62..30fef6fc0a 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -9095,16 +9095,15 @@
"deps": [
"gpr",
"grpc_base",
- "grpc_trace"
+ "grpc_trace",
+ "tsi_interface"
],
"headers": [
"src/core/tsi/fake_transport_security.h",
"src/core/tsi/gts_transport_security.h",
"src/core/tsi/ssl_transport_security.h",
"src/core/tsi/ssl_types.h",
- "src/core/tsi/transport_security.h",
- "src/core/tsi/transport_security_adapter.h",
- "src/core/tsi/transport_security_interface.h"
+ "src/core/tsi/transport_security_grpc.h"
],
"is_filegroup": true,
"language": "c",
@@ -9117,6 +9116,26 @@
"src/core/tsi/ssl_transport_security.c",
"src/core/tsi/ssl_transport_security.h",
"src/core/tsi/ssl_types.h",
+ "src/core/tsi/transport_security_grpc.c",
+ "src/core/tsi/transport_security_grpc.h"
+ ],
+ "third_party": false,
+ "type": "filegroup"
+ },
+ {
+ "deps": [
+ "gpr",
+ "grpc_trace"
+ ],
+ "headers": [
+ "src/core/tsi/transport_security.h",
+ "src/core/tsi/transport_security_adapter.h",
+ "src/core/tsi/transport_security_interface.h"
+ ],
+ "is_filegroup": true,
+ "language": "c",
+ "name": "tsi_interface",
+ "src": [
"src/core/tsi/transport_security.c",
"src/core/tsi/transport_security.h",
"src/core/tsi/transport_security_adapter.c",
diff --git a/tools/run_tests/run_build_statistics.py b/tools/run_tests/run_build_statistics.py
index d63dc3e86f..0ac6fc50aa 100755
--- a/tools/run_tests/run_build_statistics.py
+++ b/tools/run_tests/run_build_statistics.py
@@ -152,16 +152,13 @@ def _process_build(json_url, console_url):
failure_count = test_result['failCount']
build_result['pass_count'] = test_result['passCount']
build_result['failure_count'] = failure_count
+ # This means Jenkins failure occurred.
build_result['no_report_files_found'] = _no_report_files_found(html)
- if failure_count > 0:
+ # Only check errors if Jenkins failure occurred.
+ if build_result['no_report_files_found']:
error_list, known_error_count = _scrape_for_known_errors(html)
- unknown_error_count = failure_count - known_error_count
- # This can happen if the same error occurs multiple times in one test.
- if failure_count < known_error_count:
- print('====> Some errors are duplicates.')
- unknown_error_count = 0
- error_list.append({'description': _UNKNOWN_ERROR,
- 'count': unknown_error_count})
+ if not error_list:
+ error_list.append({'description': _UNKNOWN_ERROR, 'count': 1})
except Exception as e:
print('====> Got exception for %s: %s.' % (json_url, str(e)))
print('====> Parsing errors from %s.' % console_url)
@@ -176,6 +173,8 @@ def _process_build(json_url, console_url):
if error_list:
build_result['error'] = error_list
+ else:
+ build_result['error'] = [{'description': '', 'count': 0}]
return build_result
diff --git a/tools/run_tests/sanity/core_untyped_structs.sh b/tools/run_tests/sanity/core_untyped_structs.sh
new file mode 100755
index 0000000000..792dd68fdd
--- /dev/null
+++ b/tools/run_tests/sanity/core_untyped_structs.sh
@@ -0,0 +1,27 @@
+#!/bin/sh
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e
+
+cd `dirname $0`/../../..
+
+#
+# Make sure that all core struct/unions have a name or are typedef'ed
+#
+
+egrep -Irn '(struct|union) *{' include/grpc |
+ egrep -v typedef |
+ diff - /dev/null
+
diff --git a/tools/run_tests/sanity/sanity_tests.yaml b/tools/run_tests/sanity/sanity_tests.yaml
index a86ebee7b4..7e582bc40b 100644
--- a/tools/run_tests/sanity/sanity_tests.yaml
+++ b/tools/run_tests/sanity/sanity_tests.yaml
@@ -6,6 +6,7 @@
- script: tools/run_tests/sanity/check_test_filtering.py
- script: tools/run_tests/sanity/check_tracer_sanity.py
- script: tools/run_tests/sanity/core_banned_functions.py
+- script: tools/run_tests/sanity/core_untyped_structs.sh
- script: tools/buildgen/generate_projects.sh -j 3
cpu_cost: 3
- script: tools/distrib/check_copyright.py
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index 3b18b42ce9..8a659280a4 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -347,6 +347,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\gts_transport_security.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_types.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_interface.h" />
@@ -892,6 +893,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.c">
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index 546f8a57a8..6879047f55 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -556,6 +556,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.c">
+ <Filter>src\core\tsi</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
@@ -1010,6 +1013,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_types.h">
<Filter>src\core\tsi</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.h">
+ <Filter>src\core\tsi</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security.h">
<Filter>src\core\tsi</Filter>
</ClInclude>