diff options
40 files changed, 858 insertions, 272 deletions
@@ -1412,31 +1412,45 @@ grpc_cc_library( ) grpc_cc_library( + name = "tsi_interface", + srcs = [ + "src/core/tsi/transport_security.c", + "src/core/tsi/transport_security_adapter.c", + ], + hdrs = [ + "src/core/tsi/transport_security.h", + "src/core/tsi/transport_security_adapter.h", + "src/core/tsi/transport_security_interface.h", + ], + language = "c", + deps = [ + "gpr", + "grpc_trace", + ], +) + +grpc_cc_library( name = "tsi", srcs = [ "src/core/tsi/fake_transport_security.c", "src/core/tsi/gts_transport_security.c", "src/core/tsi/ssl_transport_security.c", - "src/core/tsi/transport_security.c", - "src/core/tsi/transport_security_adapter.c", + "src/core/tsi/transport_security_grpc.c", ], hdrs = [ "src/core/tsi/fake_transport_security.h", "src/core/tsi/gts_transport_security.h", "src/core/tsi/ssl_transport_security.h", "src/core/tsi/ssl_types.h", - "src/core/tsi/transport_security.h", - "src/core/tsi/transport_security_adapter.h", - "src/core/tsi/transport_security_interface.h", + "src/core/tsi/transport_security_grpc.h", ], external_deps = [ "libssl", ], language = "c", deps = [ - "gpr", "grpc_base", - "grpc_trace", + "tsi_interface", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index 743b34f11a..8dc4758d23 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1132,6 +1132,7 @@ add_library(grpc src/core/tsi/fake_transport_security.c src/core/tsi/gts_transport_security.c src/core/tsi/ssl_transport_security.c + src/core/tsi/transport_security_grpc.c src/core/tsi/transport_security.c src/core/tsi/transport_security_adapter.c src/core/ext/transport/chttp2/server/chttp2_server.c @@ -1503,6 +1504,7 @@ add_library(grpc_cronet src/core/tsi/fake_transport_security.c src/core/tsi/gts_transport_security.c src/core/tsi/ssl_transport_security.c + src/core/tsi/transport_security_grpc.c src/core/tsi/transport_security.c src/core/tsi/transport_security_adapter.c src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -3079,6 +3079,7 @@ LIBGRPC_SRC = \ src/core/tsi/fake_transport_security.c \ src/core/tsi/gts_transport_security.c \ src/core/tsi/ssl_transport_security.c \ + src/core/tsi/transport_security_grpc.c \ src/core/tsi/transport_security.c \ src/core/tsi/transport_security_adapter.c \ src/core/ext/transport/chttp2/server/chttp2_server.c \ @@ -3448,6 +3449,7 @@ LIBGRPC_CRONET_SRC = \ src/core/tsi/fake_transport_security.c \ src/core/tsi/gts_transport_security.c \ src/core/tsi/ssl_transport_security.c \ + src/core/tsi/transport_security_grpc.c \ src/core/tsi/transport_security.c \ src/core/tsi/transport_security_adapter.c \ src/core/ext/transport/chttp2/client/chttp2_connector.c \ @@ -19721,6 +19723,7 @@ src/core/tsi/gts_transport_security.c: $(OPENSSL_DEP) src/core/tsi/ssl_transport_security.c: $(OPENSSL_DEP) src/core/tsi/transport_security.c: $(OPENSSL_DEP) src/core/tsi/transport_security_adapter.c: $(OPENSSL_DEP) +src/core/tsi/transport_security_grpc.c: $(OPENSSL_DEP) src/cpp/client/cronet_credentials.cc: $(OPENSSL_DEP) src/cpp/client/secure_credentials.cc: $(OPENSSL_DEP) src/cpp/common/auth_property_iterator.cc: $(OPENSSL_DEP) diff --git a/binding.gyp b/binding.gyp index 2ca36e8f69..bbefd05c20 100644 --- a/binding.gyp +++ b/binding.gyp @@ -841,6 +841,7 @@ 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/gts_transport_security.c', 'src/core/tsi/ssl_transport_security.c', + 'src/core/tsi/transport_security_grpc.c', 'src/core/tsi/transport_security.c', 'src/core/tsi/transport_security_adapter.c', 'src/core/ext/transport/chttp2/server/chttp2_server.c', diff --git a/build.yaml b/build.yaml index f57119a87c..a459b9d688 100644 --- a/build.yaml +++ b/build.yaml @@ -925,22 +925,33 @@ filegroups: - src/core/tsi/gts_transport_security.h - src/core/tsi/ssl_transport_security.h - src/core/tsi/ssl_types.h - - src/core/tsi/transport_security.h - - src/core/tsi/transport_security_adapter.h - - src/core/tsi/transport_security_interface.h + - src/core/tsi/transport_security_grpc.h src: - src/core/tsi/fake_transport_security.c - src/core/tsi/gts_transport_security.c - src/core/tsi/ssl_transport_security.c + - src/core/tsi/transport_security_grpc.c + deps: + - gpr + plugin: grpc_tsi_gts + secure: true + uses: + - tsi_interface + - grpc_base + - grpc_trace +- name: tsi_interface + headers: + - src/core/tsi/transport_security.h + - src/core/tsi/transport_security_adapter.h + - src/core/tsi/transport_security_interface.h + src: - src/core/tsi/transport_security.c - src/core/tsi/transport_security_adapter.c deps: - gpr - plugin: grpc_tsi_gts secure: true uses: - grpc_trace - - grpc_base - name: grpc++_codegen_base language: c++ public_headers: @@ -270,6 +270,7 @@ if test "$PHP_GRPC" != "no"; then src/core/tsi/fake_transport_security.c \ src/core/tsi/gts_transport_security.c \ src/core/tsi/ssl_transport_security.c \ + src/core/tsi/transport_security_grpc.c \ src/core/tsi/transport_security.c \ src/core/tsi/transport_security_adapter.c \ src/core/ext/transport/chttp2/server/chttp2_server.c \ diff --git a/config.w32 b/config.w32 index 03e83b09e8..1d1a0a4b63 100644 --- a/config.w32 +++ b/config.w32 @@ -247,6 +247,7 @@ if (PHP_GRPC != "no") { "src\\core\\tsi\\fake_transport_security.c " + "src\\core\\tsi\\gts_transport_security.c " + "src\\core\\tsi\\ssl_transport_security.c " + + "src\\core\\tsi\\transport_security_grpc.c " + "src\\core\\tsi\\transport_security.c " + "src\\core\\tsi\\transport_security_adapter.c " + "src\\core\\ext\\transport\\chttp2\\server\\chttp2_server.c " + diff --git a/doc/load-balancing.md b/doc/load-balancing.md index f56d2b0c73..88ff35496f 100644 --- a/doc/load-balancing.md +++ b/doc/load-balancing.md @@ -113,8 +113,8 @@ works: that indicates which client-side load-balancing policy to use (e.g., `round_robin` or `grpclb`). 2. The client instantiates the load balancing policy. - - Note: If all addresses returned by the resolver are balancer - addresses, then the client will use the `grpclb` policy, regardless + - Note: If any one of the addresses returned by the resolver is a balancer + address, then the client will use the `grpclb` policy, regardless of what load-balancing policy was requested by the service config. Otherwise, the client will use the load-balancing policy requested by the service config. If no load-balancing policy is requested diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index dfd991b1c6..4b1a8f38a3 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -290,6 +290,7 @@ Pod::Spec.new do |s| 'src/core/tsi/gts_transport_security.h', 'src/core/tsi/ssl_transport_security.h', 'src/core/tsi/ssl_types.h', + 'src/core/tsi/transport_security_grpc.h', 'src/core/tsi/transport_security.h', 'src/core/tsi/transport_security_adapter.h', 'src/core/tsi/transport_security_interface.h', @@ -648,6 +649,7 @@ Pod::Spec.new do |s| 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/gts_transport_security.c', 'src/core/tsi/ssl_transport_security.c', + 'src/core/tsi/transport_security_grpc.c', 'src/core/tsi/transport_security.c', 'src/core/tsi/transport_security_adapter.c', 'src/core/ext/transport/chttp2/server/chttp2_server.c', @@ -782,6 +784,7 @@ Pod::Spec.new do |s| 'src/core/tsi/gts_transport_security.h', 'src/core/tsi/ssl_transport_security.h', 'src/core/tsi/ssl_types.h', + 'src/core/tsi/transport_security_grpc.h', 'src/core/tsi/transport_security.h', 'src/core/tsi/transport_security_adapter.h', 'src/core/tsi/transport_security_interface.h', diff --git a/grpc.gemspec b/grpc.gemspec index 07ca5de73c..f04a14167b 100755..100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -222,6 +222,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/tsi/gts_transport_security.h ) s.files += %w( src/core/tsi/ssl_transport_security.h ) s.files += %w( src/core/tsi/ssl_types.h ) + s.files += %w( src/core/tsi/transport_security_grpc.h ) s.files += %w( src/core/tsi/transport_security.h ) s.files += %w( src/core/tsi/transport_security_adapter.h ) s.files += %w( src/core/tsi/transport_security_interface.h ) @@ -580,6 +581,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/tsi/fake_transport_security.c ) s.files += %w( src/core/tsi/gts_transport_security.c ) s.files += %w( src/core/tsi/ssl_transport_security.c ) + s.files += %w( src/core/tsi/transport_security_grpc.c ) s.files += %w( src/core/tsi/transport_security.c ) s.files += %w( src/core/tsi/transport_security_adapter.c ) s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.c ) diff --git a/include/grpc/impl/codegen/byte_buffer_reader.h b/include/grpc/impl/codegen/byte_buffer_reader.h index 2ae3f1e20e..dc0f15496f 100644 --- a/include/grpc/impl/codegen/byte_buffer_reader.h +++ b/include/grpc/impl/codegen/byte_buffer_reader.h @@ -29,7 +29,7 @@ struct grpc_byte_buffer_reader { struct grpc_byte_buffer *buffer_in; struct grpc_byte_buffer *buffer_out; /** Different current objects correspond to different types of byte buffers */ - union { + union grpc_byte_buffer_reader_current { /** Index into a slice buffer's array of slices */ unsigned index; } current; diff --git a/include/grpc/impl/codegen/compression_types.h b/include/grpc/impl/codegen/compression_types.h index e39c13e88d..f1b2de3f7d 100644 --- a/include/grpc/impl/codegen/compression_types.h +++ b/include/grpc/impl/codegen/compression_types.h @@ -84,7 +84,7 @@ typedef struct grpc_compression_options { * behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL. If present, takes * precedence over \a default_algorithm. * TODO(dgq): currently only available for server channels. */ - struct { + struct grpc_compression_options_default_level { int is_set; grpc_compression_level level; } default_level; @@ -92,7 +92,7 @@ typedef struct grpc_compression_options { /** The default channel compression algorithm. It'll be used in the absence of * call specific settings. This option corresponds to the channel argument key * behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM. */ - struct { + struct grpc_compression_options_default_algorithm { int is_set; grpc_compression_algorithm algorithm; } default_algorithm; diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 2b2036b24f..8813ec8f35 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -41,11 +41,11 @@ typedef enum { typedef struct grpc_byte_buffer { void *reserved; grpc_byte_buffer_type type; - union { - struct { + union grpc_byte_buffer_data { + struct /* internal */ { void *reserved[8]; } reserved; - struct { + struct grpc_compressed_buffer { grpc_compression_algorithm compression; grpc_slice_buffer slice_buffer; } raw; @@ -104,10 +104,10 @@ typedef struct grpc_arg_pointer_vtable { typedef struct { grpc_arg_type type; char *key; - union { + union grpc_arg_value { char *string; int integer; - struct { + struct grpc_arg_pointer { void *p; const grpc_arg_pointer_vtable *vtable; } pointer; @@ -391,7 +391,7 @@ typedef struct grpc_metadata { /** The following fields are reserved for grpc internal use. There is no need to initialize them, and they will be set to garbage during calls to grpc. */ - struct { + struct /* internal */ { void *obfuscated[4]; } internal_data; } grpc_metadata; @@ -491,25 +491,25 @@ typedef struct grpc_op { uint32_t flags; /** Reserved for future usage */ void *reserved; - union { + union grpc_op_data { /** Reserved for future usage */ - struct { + struct /* internal */ { void *reserved[8]; } reserved; - struct { + struct grpc_op_send_initial_metadata { size_t count; grpc_metadata *metadata; /** If \a is_set, \a compression_level will be used for the call. * Otherwise, \a compression_level won't be considered */ - struct { + struct grpc_op_send_initial_metadata_maybe_compression_level { uint8_t is_set; grpc_compression_level level; } maybe_compression_level; } send_initial_metadata; - struct { + struct grpc_op_send_message { struct grpc_byte_buffer *send_message; } send_message; - struct { + struct grpc_op_send_status_from_server { size_t trailing_metadata_count; grpc_metadata *trailing_metadata; grpc_status_code status; @@ -523,16 +523,16 @@ typedef struct grpc_op { object, recv_initial_metadata->array is owned by the caller). After the operation completes, call grpc_metadata_array_destroy on this value, or reuse it in a future op. */ - struct { + struct grpc_op_recv_initial_metadata { grpc_metadata_array *recv_initial_metadata; } recv_initial_metadata; /** ownership of the byte buffer is moved to the caller; the caller must call grpc_byte_buffer_destroy on this value, or reuse it in a future op. */ - struct { + struct grpc_op_recv_message { struct grpc_byte_buffer **recv_message; } recv_message; - struct { + struct grpc_op_recv_status_on_client { /** ownership of the array is with the caller, but ownership of the elements stays with the call object (ie key, value members are owned by the call object, trailing_metadata->array is owned by the caller). @@ -542,7 +542,7 @@ typedef struct grpc_op { grpc_status_code *status; grpc_slice *status_details; } recv_status_on_client; - struct { + struct grpc_op_recv_close_on_server { /** out argument, set to 1 if the call failed in any way (seen as a cancellation on the server), or 0 if the call succeeded */ int *cancelled; diff --git a/include/grpc/impl/codegen/slice.h b/include/grpc/impl/codegen/slice.h index 5ec439eb37..a04c683a55 100644 --- a/include/grpc/impl/codegen/slice.h +++ b/include/grpc/impl/codegen/slice.h @@ -75,12 +75,12 @@ typedef struct grpc_slice_refcount { of data that is copied by value. */ struct grpc_slice { struct grpc_slice_refcount *refcount; - union { - struct { + union grpc_slice_data { + struct grpc_slice_refcounted { uint8_t *bytes; size_t length; } refcounted; - struct { + struct grpc_slice_inlined { uint8_t length; uint8_t bytes[GRPC_SLICE_INLINED_SIZE]; } inlined; diff --git a/package.xml b/package.xml index 9d3fb1b5de..4e288b3c68 100644 --- a/package.xml +++ b/package.xml @@ -236,6 +236,7 @@ <file baseinstalldir="/" name="src/core/tsi/gts_transport_security.h" role="src" /> <file baseinstalldir="/" name="src/core/tsi/ssl_transport_security.h" role="src" /> <file baseinstalldir="/" name="src/core/tsi/ssl_types.h" role="src" /> + <file baseinstalldir="/" name="src/core/tsi/transport_security_grpc.h" role="src" /> <file baseinstalldir="/" name="src/core/tsi/transport_security.h" role="src" /> <file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.h" role="src" /> <file baseinstalldir="/" name="src/core/tsi/transport_security_interface.h" role="src" /> @@ -594,6 +595,7 @@ <file baseinstalldir="/" name="src/core/tsi/fake_transport_security.c" role="src" /> <file baseinstalldir="/" name="src/core/tsi/gts_transport_security.c" role="src" /> <file baseinstalldir="/" name="src/core/tsi/ssl_transport_security.c" role="src" /> + <file baseinstalldir="/" name="src/core/tsi/transport_security_grpc.c" role="src" /> <file baseinstalldir="/" name="src/core/tsi/transport_security.c" role="src" /> <file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.c" role="src" /> diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c index 543a73c5ad..823c681abf 100644 --- a/src/core/ext/census/tracing.c +++ b/src/core/ext/census/tracing.c @@ -21,7 +21,6 @@ #include <grpc/census.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <openssl/rand.h> #include "src/core/ext/census/mlog.h" void trace_start_span(const trace_span_context *span_ctxt, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 365aa583bb..9472a8e520 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -42,6 +42,7 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/support/murmur_hash.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -239,22 +240,43 @@ struct grpc_pollset_set { * condition variable polling definitions */ +#define POLLCV_THREAD_GRACE_MS 1000 #define CV_POLL_PERIOD_MS 1000 #define CV_DEFAULT_TABLE_SIZE 16 -typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t; - -typedef struct poll_args { +typedef struct poll_result { gpr_refcount refcount; - gpr_cv *cv; + cv_node *watchers; + int watchcount; struct pollfd *fds; nfds_t nfds; - int timeout; int retval; int err; - gpr_atm status; + int completed; +} poll_result; + +typedef struct poll_args { + gpr_cv trigger; + int trigger_set; + struct pollfd *fds; + nfds_t nfds; + poll_result *result; + struct poll_args *next; + struct poll_args *prev; } poll_args; +// This is a 2-tiered cache, we mantain a hash table +// of active poll calls, so we can wait on the result +// of that call. We also maintain a freelist of inactive +// poll threads. +typedef struct poll_hash_table { + poll_args *free_pollers; + poll_args **active_pollers; + unsigned int size; + unsigned int count; +} poll_hash_table; + +poll_hash_table poll_cache; cv_fd_table g_cvfds; /******************************************************************************* @@ -1277,43 +1299,205 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, * Condition Variable polling extensions */ -static void decref_poll_args(poll_args *args) { - if (gpr_unref(&args->refcount)) { - gpr_free(args->fds); - gpr_cv_destroy(args->cv); - gpr_free(args->cv); - gpr_free(args); +static void run_poll(void *args); +static void cache_poller_locked(poll_args *args); + +static void cache_insert_locked(poll_args *args) { + uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd), + 0xDEADBEEF); + key = key % poll_cache.size; + if (poll_cache.active_pollers[key]) { + poll_cache.active_pollers[key]->prev = args; } + args->next = poll_cache.active_pollers[key]; + args->prev = NULL; + poll_cache.active_pollers[key] = args; + poll_cache.count++; } -// Poll in a background thread -static void run_poll(void *arg) { - int timeout, retval; - poll_args *pargs = (poll_args *)arg; - while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) { - if (pargs->timeout < 0) { - timeout = CV_POLL_PERIOD_MS; - } else { - timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout); - pargs->timeout -= timeout; +static void init_result(poll_args *pargs) { + pargs->result = gpr_malloc(sizeof(poll_result)); + gpr_ref_init(&pargs->result->refcount, 1); + pargs->result->watchers = NULL; + pargs->result->watchcount = 0; + pargs->result->fds = gpr_malloc(sizeof(struct pollfd) * pargs->nfds); + memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds); + pargs->result->nfds = pargs->nfds; + pargs->result->retval = 0; + pargs->result->err = 0; + pargs->result->completed = 0; +} + +// Creates a poll_args object for a given arguments to poll(). +// This object may return a poll_args in the cache. +static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) { + uint32_t key = + gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF); + key = key % poll_cache.size; + poll_args *curr = poll_cache.active_pollers[key]; + while (curr) { + if (curr->nfds == count && + memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) { + gpr_free(fds); + return curr; } - retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout); - if (retval != 0 || pargs->timeout == 0) { - pargs->retval = retval; - pargs->err = errno; - break; + curr = curr->next; + } + + if (poll_cache.free_pollers) { + poll_args *pargs = poll_cache.free_pollers; + poll_cache.free_pollers = pargs->next; + if (poll_cache.free_pollers) { + poll_cache.free_pollers->prev = NULL; } + pargs->fds = fds; + pargs->nfds = count; + pargs->next = NULL; + pargs->prev = NULL; + init_result(pargs); + cache_poller_locked(pargs); + return pargs; + } + + poll_args *pargs = gpr_malloc(sizeof(struct poll_args)); + gpr_cv_init(&pargs->trigger); + pargs->fds = fds; + pargs->nfds = count; + pargs->next = NULL; + pargs->prev = NULL; + pargs->trigger_set = 0; + init_result(pargs); + cache_poller_locked(pargs); + gpr_thd_id t_id; + gpr_thd_options opt = gpr_thd_options_default(); + gpr_ref(&g_cvfds.pollcount); + gpr_thd_options_set_detached(&opt); + GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt)); + return pargs; +} + +static void cache_delete_locked(poll_args *args) { + if (!args->prev) { + uint32_t key = gpr_murmur_hash3( + args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF); + key = key % poll_cache.size; + GPR_ASSERT(poll_cache.active_pollers[key] == args); + poll_cache.active_pollers[key] = args->next; + } else { + args->prev->next = args->next; } - gpr_mu_lock(&g_cvfds.mu); - if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) { - // Signal main thread that the poll completed - gpr_atm_no_barrier_store(&pargs->status, COMPLETED); - gpr_cv_signal(pargs->cv); + + if (args->next) { + args->next->prev = args->prev; } - decref_poll_args(pargs); - g_cvfds.pollcount--; - if (g_cvfds.shutdown && g_cvfds.pollcount == 0) { - gpr_cv_signal(&g_cvfds.shutdown_complete); + + poll_cache.count--; + if (poll_cache.free_pollers) { + poll_cache.free_pollers->prev = args; + } + args->prev = NULL; + args->next = poll_cache.free_pollers; + gpr_free(args->fds); + poll_cache.free_pollers = args; +} + +static void cache_poller_locked(poll_args *args) { + if (poll_cache.count + 1 > poll_cache.size / 2) { + poll_args **old_active_pollers = poll_cache.active_pollers; + poll_cache.size = poll_cache.size * 2; + poll_cache.count = 0; + poll_cache.active_pollers = gpr_malloc(sizeof(void *) * poll_cache.size); + for (unsigned int i = 0; i < poll_cache.size; i++) { + poll_cache.active_pollers[i] = NULL; + } + for (unsigned int i = 0; i < poll_cache.size / 2; i++) { + poll_args *curr = old_active_pollers[i]; + poll_args *next = NULL; + while (curr) { + next = curr->next; + cache_insert_locked(curr); + curr = next; + } + } + gpr_free(old_active_pollers); + } + + cache_insert_locked(args); +} + +static void cache_destroy_locked(poll_args *args) { + if (args->next) { + args->next->prev = args->prev; + } + + if (args->prev) { + args->prev->next = args->next; + } else { + poll_cache.free_pollers = args->next; + } + + gpr_free(args); +} + +static void decref_poll_result(poll_result *res) { + if (gpr_unref(&res->refcount)) { + GPR_ASSERT(!res->watchers); + gpr_free(res->fds); + gpr_free(res); + } +} + +void remove_cvn(cv_node **head, cv_node *target) { + if (target->next) { + target->next->prev = target->prev; + } + + if (target->prev) { + target->prev->next = target->next; + } else { + *head = target->next; + } +} + +gpr_timespec thread_grace; + +// Poll in a background thread +static void run_poll(void *args) { + poll_args *pargs = (poll_args *)args; + while (1) { + poll_result *result = pargs->result; + int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS); + gpr_mu_lock(&g_cvfds.mu); + if (retval != 0) { + result->completed = 1; + result->retval = retval; + result->err = errno; + cv_node *watcher = result->watchers; + while (watcher) { + gpr_cv_signal(watcher->cv); + watcher = watcher->next; + } + } + if (result->watchcount == 0 || result->completed) { + cache_delete_locked(pargs); + decref_poll_result(result); + // Leave this polling thread alive for a grace period to do another poll() + // op + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + deadline = gpr_time_add(deadline, thread_grace); + pargs->trigger_set = 0; + gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline); + if (!pargs->trigger_set) { + cache_destroy_locked(pargs); + break; + } + } + gpr_mu_unlock(&g_cvfds.mu); + } + + // We still have the lock here + if (gpr_unref(&g_cvfds.pollcount)) { + gpr_cv_signal(&g_cvfds.shutdown_cv); } gpr_mu_unlock(&g_cvfds.mu); } @@ -1322,24 +1506,29 @@ static void run_poll(void *arg) { static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { unsigned int i; int res, idx; - gpr_cv *pollcv; - cv_node *cvn, *prev; + cv_node *pollcv; int skip_poll = 0; nfds_t nsockfds = 0; - gpr_thd_id t_id; - gpr_thd_options opt; - poll_args *pargs = NULL; + poll_result *result = NULL; gpr_mu_lock(&g_cvfds.mu); - pollcv = gpr_malloc(sizeof(gpr_cv)); - gpr_cv_init(pollcv); + pollcv = gpr_malloc(sizeof(cv_node)); + pollcv->next = NULL; + gpr_cv pollcv_cv; + gpr_cv_init(&pollcv_cv); + pollcv->cv = &pollcv_cv; + cv_node *fd_cvs = gpr_malloc(nfds * sizeof(cv_node)); + for (i = 0; i < nfds; i++) { fds[i].revents = 0; if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { idx = FD_TO_IDX(fds[i].fd); - cvn = gpr_malloc(sizeof(cv_node)); - cvn->cv = pollcv; - cvn->next = g_cvfds.cvfds[idx].cvs; - g_cvfds.cvfds[idx].cvs = cvn; + fd_cvs[i].cv = &pollcv_cv; + fd_cvs[i].prev = NULL; + fd_cvs[i].next = g_cvfds.cvfds[idx].cvs; + if (g_cvfds.cvfds[idx].cvs) { + g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]); + } + g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]); // Don't bother polling if a wakeup fd is ready if (g_cvfds.cvfds[idx].is_set) { skip_poll = 1; @@ -1349,81 +1538,68 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { } } + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + if (timeout < 0) { + deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + } else { + deadline = + gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); + } + res = 0; if (!skip_poll && nsockfds > 0) { - pargs = gpr_malloc(sizeof(struct poll_args)); - // Both the main thread and calling thread get a reference - gpr_ref_init(&pargs->refcount, 2); - pargs->cv = pollcv; - pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds); - pargs->nfds = nsockfds; - pargs->timeout = timeout; - pargs->retval = 0; - pargs->err = 0; - gpr_atm_no_barrier_store(&pargs->status, INPROGRESS); + struct pollfd *pollfds = gpr_malloc(sizeof(struct pollfd) * nsockfds); idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd >= 0) { - pargs->fds[idx].fd = fds[i].fd; - pargs->fds[idx].events = fds[i].events; - pargs->fds[idx].revents = 0; + pollfds[idx].fd = fds[i].fd; + pollfds[idx].events = fds[i].events; + pollfds[idx].revents = 0; idx++; } } - g_cvfds.pollcount++; - opt = gpr_thd_options_default(); - gpr_thd_options_set_detached(&opt); - GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt)); - // We want the poll() thread to trigger the deadline, so wait forever here - gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); - if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { - res = pargs->retval; - errno = pargs->err; - } else { - errno = 0; - gpr_atm_no_barrier_store(&pargs->status, CANCELLED); + poll_args *pargs = get_poller_locked(pollfds, nsockfds); + result = pargs->result; + pollcv->next = result->watchers; + pollcv->prev = NULL; + if (result->watchers) { + result->watchers->prev = pollcv; } + result->watchers = pollcv; + result->watchcount++; + gpr_ref(&result->refcount); + + pargs->trigger_set = 1; + gpr_cv_signal(&pargs->trigger); + gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + res = result->retval; + errno = result->err; + result->watchcount--; + remove_cvn(&result->watchers, pollcv); } else if (!skip_poll) { - gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); - deadline = - gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); - gpr_cv_wait(pollcv, &g_cvfds.mu, deadline); + gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); } idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs; - prev = NULL; - while (cvn->cv != pollcv) { - prev = cvn; - cvn = cvn->next; - GPR_ASSERT(cvn); - } - if (!prev) { - g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next; - } else { - prev->next = cvn->next; - } - gpr_free(cvn); - + remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { fds[i].revents = POLLIN; if (res >= 0) res++; } - } else if (!skip_poll && fds[i].fd >= 0 && - gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { - fds[i].revents = pargs->fds[idx].revents; + } else if (!skip_poll && fds[i].fd >= 0 && result->completed) { + fds[i].revents = result->fds[idx].revents; idx++; } } - if (pargs) { - decref_poll_args(pargs); - } else { - gpr_cv_destroy(pollcv); - gpr_free(pollcv); + gpr_free(fd_cvs); + gpr_free(pollcv); + if (result) { + decref_poll_result(result); } + gpr_mu_unlock(&g_cvfds.mu); return res; @@ -1432,12 +1608,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { static void global_cv_fd_table_init() { gpr_mu_init(&g_cvfds.mu); gpr_mu_lock(&g_cvfds.mu); - gpr_cv_init(&g_cvfds.shutdown_complete); - g_cvfds.shutdown = 0; - g_cvfds.pollcount = 0; + gpr_cv_init(&g_cvfds.shutdown_cv); + gpr_ref_init(&g_cvfds.pollcount, 1); g_cvfds.size = CV_DEFAULT_TABLE_SIZE; g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); g_cvfds.free_fds = NULL; + thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN); for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { g_cvfds.cvfds[i].is_set = 0; g_cvfds.cvfds[i].cvs = NULL; @@ -1447,23 +1623,35 @@ static void global_cv_fd_table_init() { // Override the poll function with one that supports cvfds g_cvfds.poll = grpc_poll_function; grpc_poll_function = &cvfd_poll; + + // Initialize the cache + poll_cache.size = 32; + poll_cache.count = 0; + poll_cache.free_pollers = NULL; + poll_cache.active_pollers = gpr_malloc(sizeof(void *) * 32); + for (unsigned int i = 0; i < poll_cache.size; i++) { + poll_cache.active_pollers[i] = NULL; + } + gpr_mu_unlock(&g_cvfds.mu); } static void global_cv_fd_table_shutdown() { gpr_mu_lock(&g_cvfds.mu); - g_cvfds.shutdown = 1; // Attempt to wait for all abandoned poll() threads to terminate // Not doing so will result in reported memory leaks - if (g_cvfds.pollcount > 0) { - int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu, + if (!gpr_unref(&g_cvfds.pollcount)) { + int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(3, GPR_TIMESPAN))); GPR_ASSERT(res == 0); } - gpr_cv_destroy(&g_cvfds.shutdown_complete); + gpr_cv_destroy(&g_cvfds.shutdown_cv); grpc_poll_function = g_cvfds.poll; gpr_free(g_cvfds.cvfds); + + gpr_free(poll_cache.active_pollers); + gpr_mu_unlock(&g_cvfds.mu); gpr_mu_destroy(&g_cvfds.mu); } diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 833170ceed..41c69add17 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -51,33 +51,6 @@ bool grpc_exec_ctx_has_work(grpc_exec_ctx *exec_ctx) { !grpc_closure_list_empty(exec_ctx->closure_list); } -bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { - bool did_something = 0; - GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); - for (;;) { - if (!grpc_closure_list_empty(exec_ctx->closure_list)) { - grpc_closure *c = exec_ctx->closure_list.head; - exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; - did_something = true; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - } - } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) { - break; - } - } - GPR_ASSERT(exec_ctx->active_combiner == NULL); - GPR_TIMER_END("grpc_exec_ctx_flush", 0); - return did_something; -} - void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; grpc_exec_ctx_flush(exec_ctx); @@ -103,6 +76,29 @@ static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure, GRPC_ERROR_UNREF(error); } +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { + bool did_something = 0; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); + for (;;) { + if (!grpc_closure_list_empty(exec_ctx->closure_list)) { + grpc_closure *c = exec_ctx->closure_list.head; + exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + did_something = true; + exec_ctx_run(exec_ctx, c, error); + c = next; + } + } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) { + break; + } + } + GPR_ASSERT(exec_ctx->active_combiner == NULL); + GPR_TIMER_END("grpc_exec_ctx_flush", 0); + return did_something; +} + static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error) { grpc_closure_list_append(&exec_ctx->closure_list, closure, error); diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index c5dcdc9746..46e84f5843 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -43,6 +43,7 @@ typedef struct cv_node { gpr_cv* cv; struct cv_node* next; + struct cv_node* prev; } cv_node; typedef struct fd_node { @@ -53,9 +54,8 @@ typedef struct fd_node { typedef struct cv_fd_table { gpr_mu mu; - int pollcount; - int shutdown; - gpr_cv shutdown_complete; + gpr_refcount pollcount; + gpr_cv shutdown_cv; fd_node* cvfds; fd_node* free_fds; unsigned int size; diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index b9da6e16b2..fc9c9f980f 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -261,7 +261,7 @@ static grpc_error *do_handshaker_next_locked( grpc_exec_ctx *exec_ctx, security_handshaker *h, const unsigned char *bytes_received, size_t bytes_received_size) { // Invoke TSI handshaker. - unsigned char *bytes_to_send = NULL; + const unsigned char *bytes_to_send = NULL; size_t bytes_to_send_size = 0; tsi_handshaker_result *handshaker_result = NULL; tsi_result result = tsi_handshaker_next( diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 3d82a32e82..c20cfbc740 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -235,7 +235,8 @@ typedef struct cq_next_data { /* Number of outstanding events (+1 if not shut down) */ gpr_atm pending_events; - int shutdown_called; + /** 0 initially. 1 once we initiated shutdown */ + bool shutdown_called; } cq_next_data; typedef struct cq_pluck_data { @@ -244,15 +245,20 @@ typedef struct cq_pluck_data { grpc_cq_completion *completed_tail; /** Number of pending events (+1 if we're not shutdown) */ - gpr_refcount pending_events; + gpr_atm pending_events; /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; - /** 0 initially, 1 once we've begun shutting down */ + /** 0 initially. 1 once we completed shutting */ + /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if + * (pending_events == 0). So consider removing this in future and use + * pending_events */ gpr_atm shutdown; - int shutdown_called; + + /** 0 initially. 1 once we initiated shutdown */ + bool shutdown_called; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; @@ -436,7 +442,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( static void cq_init_next(void *ptr) { cq_next_data *cqd = ptr; - /* Initial ref is dropped by grpc_completion_queue_shutdown */ + /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); cqd->shutdown_called = false; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); @@ -451,12 +457,12 @@ static void cq_destroy_next(void *ptr) { static void cq_init_pluck(void *ptr) { cq_pluck_data *cqd = ptr; - /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cqd->pending_events, 1); + /* Initial count is dropped by grpc_completion_queue_shutdown */ + gpr_atm_no_barrier_store(&cqd->pending_events, 1); cqd->completed_tail = &cqd->completed_head; cqd->completed_head.next = (uintptr_t)cqd->completed_tail; gpr_atm_no_barrier_store(&cqd->shutdown, 0); - cqd->shutdown_called = 0; + cqd->shutdown_called = false; cqd->num_pluckers = 0; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); } @@ -549,24 +555,32 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif -static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { - cq_next_data *cqd = DATA_FROM_CQ(cq); +/* Atomically increments a counter only if the counter is not zero. Returns + * true if the increment was successful; false if the counter is zero */ +static bool atm_inc_if_nonzero(gpr_atm *counter) { while (true) { - gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events); + gpr_atm count = gpr_atm_no_barrier_load(counter); + /* If zero, we are done. If not, we must to a CAS (instead of an atomic + * increment) to maintain the contract: do not increment the counter if it + * is zero. */ if (count == 0) { return false; - } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) { + } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) { break; } } + return true; } +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + return atm_inc_if_nonzero(&cqd->pending_events); +} + static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { cq_pluck_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_ref(&cqd->pending_events); - return true; + return atm_inc_if_nonzero(&cqd->pending_events); } bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { @@ -704,8 +718,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next); cqd->completed_tail = storage; - int shutdown = gpr_unref(&cqd->pending_events); - if (!shutdown) { + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_pluck(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + } else { grpc_pollset_worker *pluck_worker = NULL; for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag) { @@ -725,9 +741,6 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } - } else { - cq_finish_shutdown_pluck(exec_ctx, cq); - gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_pluck", 0); @@ -952,6 +965,12 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { cq_next_data *cqd = DATA_FROM_CQ(cq); + /* Need an extra ref for cq here because: + * We call cq_finish_shutdown_next() below, that would call pollset shutdown. + * Pollset shutdown decrements the cq ref count which can potentially destroy + * the cq (if that happens to be the last ref). + * Creating an extra ref here prevents the cq from getting destroyed while + * this function is still active */ GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { @@ -960,7 +979,7 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } - cqd->shutdown_called = 1; + cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { cq_finish_shutdown_next(exec_ctx, cq); } @@ -1172,21 +1191,32 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, &cq->pollset_shutdown_done); } +/* NOTE: This function is almost exactly identical to cq_shutdown_next() but + * merging them is a bit tricky and probably not worth it */ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { cq_pluck_data *cqd = DATA_FROM_CQ(cq); + /* Need an extra ref for cq here because: + * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. + * Pollset shutdown decrements the cq ref count which can potentially destroy + * the cq (if that happens to be the last ref). + * Creating an extra ref here prevents the cq from getting destroyed while + * this function is still active */ + GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)"); gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } - cqd->shutdown_called = 1; - if (gpr_unref(&cqd->pending_events)) { + cqd->shutdown_called = true; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { cq_finish_shutdown_pluck(exec_ctx, cq); } gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); } /* Shutdown simply drops a ref that we reserved at creation time; if we drop diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index 810447313c..967126ecee 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -407,8 +407,10 @@ static void fake_handshaker_result_destroy(tsi_handshaker_result *self) { static const tsi_handshaker_result_vtable handshaker_result_vtable = { fake_handshaker_result_extract_peer, + NULL, /* create_zero_copy_grpc_protector */ fake_handshaker_result_create_frame_protector, - fake_handshaker_result_get_unused_bytes, fake_handshaker_result_destroy, + fake_handshaker_result_get_unused_bytes, + fake_handshaker_result_destroy, }; static tsi_result fake_handshaker_result_create( @@ -530,7 +532,7 @@ static void fake_handshaker_destroy(tsi_handshaker *self) { static tsi_result fake_handshaker_next( tsi_handshaker *self, const unsigned char *received_bytes, - size_t received_bytes_size, unsigned char **bytes_to_send, + size_t received_bytes_size, const unsigned char **bytes_to_send, size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, tsi_handshaker_on_next_done_cb cb, void *user_data) { /* Sanity check the arguments. */ diff --git a/src/core/tsi/transport_security.c b/src/core/tsi/transport_security.c index 2b1f4310c1..76213072a3 100644 --- a/src/core/tsi/transport_security.c +++ b/src/core/tsi/transport_security.c @@ -74,14 +74,12 @@ tsi_result tsi_frame_protector_protect(tsi_frame_protector *self, size_t *unprotected_bytes_size, unsigned char *protected_output_frames, size_t *protected_output_frames_size) { - if (self == NULL || unprotected_bytes == NULL || + if (self == NULL || self->vtable == NULL || unprotected_bytes == NULL || unprotected_bytes_size == NULL || protected_output_frames == NULL || protected_output_frames_size == NULL) { return TSI_INVALID_ARGUMENT; } - if (self->vtable == NULL || self->vtable->protect == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->protect == NULL) return TSI_UNIMPLEMENTED; return self->vtable->protect(self, unprotected_bytes, unprotected_bytes_size, protected_output_frames, protected_output_frames_size); @@ -90,13 +88,11 @@ tsi_result tsi_frame_protector_protect(tsi_frame_protector *self, tsi_result tsi_frame_protector_protect_flush( tsi_frame_protector *self, unsigned char *protected_output_frames, size_t *protected_output_frames_size, size_t *still_pending_size) { - if (self == NULL || protected_output_frames == NULL || + if (self == NULL || self->vtable == NULL || protected_output_frames == NULL || protected_output_frames_size == NULL || still_pending_size == NULL) { return TSI_INVALID_ARGUMENT; } - if (self->vtable == NULL || self->vtable->protect_flush == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->protect_flush == NULL) return TSI_UNIMPLEMENTED; return self->vtable->protect_flush(self, protected_output_frames, protected_output_frames_size, still_pending_size); @@ -106,14 +102,12 @@ tsi_result tsi_frame_protector_unprotect( tsi_frame_protector *self, const unsigned char *protected_frames_bytes, size_t *protected_frames_bytes_size, unsigned char *unprotected_bytes, size_t *unprotected_bytes_size) { - if (self == NULL || protected_frames_bytes == NULL || + if (self == NULL || self->vtable == NULL || protected_frames_bytes == NULL || protected_frames_bytes_size == NULL || unprotected_bytes == NULL || unprotected_bytes_size == NULL) { return TSI_INVALID_ARGUMENT; } - if (self->vtable == NULL || self->vtable->unprotect == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->unprotect == NULL) return TSI_UNIMPLEMENTED; return self->vtable->unprotect(self, protected_frames_bytes, protected_frames_bytes_size, unprotected_bytes, unprotected_bytes_size); @@ -131,48 +125,44 @@ void tsi_frame_protector_destroy(tsi_frame_protector *self) { tsi_result tsi_handshaker_get_bytes_to_send_to_peer(tsi_handshaker *self, unsigned char *bytes, size_t *bytes_size) { - if (self == NULL || bytes == NULL || bytes_size == NULL) { + if (self == NULL || self->vtable == NULL || bytes == NULL || + bytes_size == NULL) { return TSI_INVALID_ARGUMENT; } if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; - if (self->vtable == NULL || self->vtable->get_bytes_to_send_to_peer == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->get_bytes_to_send_to_peer == NULL) return TSI_UNIMPLEMENTED; return self->vtable->get_bytes_to_send_to_peer(self, bytes, bytes_size); } tsi_result tsi_handshaker_process_bytes_from_peer(tsi_handshaker *self, const unsigned char *bytes, size_t *bytes_size) { - if (self == NULL || bytes == NULL || bytes_size == NULL) { + if (self == NULL || self->vtable == NULL || bytes == NULL || + bytes_size == NULL) { return TSI_INVALID_ARGUMENT; } if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; - if (self->vtable == NULL || self->vtable->process_bytes_from_peer == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->process_bytes_from_peer == NULL) return TSI_UNIMPLEMENTED; return self->vtable->process_bytes_from_peer(self, bytes, bytes_size); } tsi_result tsi_handshaker_get_result(tsi_handshaker *self) { - if (self == NULL) return TSI_INVALID_ARGUMENT; + if (self == NULL || self->vtable == NULL) return TSI_INVALID_ARGUMENT; if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; - if (self->vtable == NULL || self->vtable->get_result == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->get_result == NULL) return TSI_UNIMPLEMENTED; return self->vtable->get_result(self); } tsi_result tsi_handshaker_extract_peer(tsi_handshaker *self, tsi_peer *peer) { - if (self == NULL || peer == NULL) return TSI_INVALID_ARGUMENT; + if (self == NULL || self->vtable == NULL || peer == NULL) { + return TSI_INVALID_ARGUMENT; + } memset(peer, 0, sizeof(tsi_peer)); if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; if (tsi_handshaker_get_result(self) != TSI_OK) { return TSI_FAILED_PRECONDITION; } - if (self->vtable == NULL || self->vtable->extract_peer == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->extract_peer == NULL) return TSI_UNIMPLEMENTED; return self->vtable->extract_peer(self, peer); } @@ -180,14 +170,12 @@ tsi_result tsi_handshaker_create_frame_protector( tsi_handshaker *self, size_t *max_protected_frame_size, tsi_frame_protector **protector) { tsi_result result; - if (self == NULL || protector == NULL) return TSI_INVALID_ARGUMENT; - if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; - if (tsi_handshaker_get_result(self) != TSI_OK) { - return TSI_FAILED_PRECONDITION; - } - if (self->vtable == NULL || self->vtable->create_frame_protector == NULL) { - return TSI_UNIMPLEMENTED; + if (self == NULL || self->vtable == NULL || protector == NULL) { + return TSI_INVALID_ARGUMENT; } + if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; + if (tsi_handshaker_get_result(self) != TSI_OK) return TSI_FAILED_PRECONDITION; + if (self->vtable->create_frame_protector == NULL) return TSI_UNIMPLEMENTED; result = self->vtable->create_frame_protector(self, max_protected_frame_size, protector); if (result == TSI_OK) { @@ -198,14 +186,12 @@ tsi_result tsi_handshaker_create_frame_protector( tsi_result tsi_handshaker_next( tsi_handshaker *self, const unsigned char *received_bytes, - size_t received_bytes_size, unsigned char **bytes_to_send, + size_t received_bytes_size, const unsigned char **bytes_to_send, size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, tsi_handshaker_on_next_done_cb cb, void *user_data) { - if (self == NULL) return TSI_INVALID_ARGUMENT; + if (self == NULL || self->vtable == NULL) return TSI_INVALID_ARGUMENT; if (self->handshaker_result_created) return TSI_FAILED_PRECONDITION; - if (self->vtable == NULL || self->vtable->next == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->next == NULL) return TSI_UNIMPLEMENTED; return self->vtable->next(self, received_bytes, received_bytes_size, bytes_to_send, bytes_to_send_size, handshaker_result, cb, user_data); @@ -220,21 +206,21 @@ void tsi_handshaker_destroy(tsi_handshaker *self) { tsi_result tsi_handshaker_result_extract_peer(const tsi_handshaker_result *self, tsi_peer *peer) { - if (self == NULL || peer == NULL) return TSI_INVALID_ARGUMENT; - memset(peer, 0, sizeof(tsi_peer)); - if (self->vtable == NULL || self->vtable->extract_peer == NULL) { - return TSI_UNIMPLEMENTED; + if (self == NULL || self->vtable == NULL || peer == NULL) { + return TSI_INVALID_ARGUMENT; } + memset(peer, 0, sizeof(tsi_peer)); + if (self->vtable->extract_peer == NULL) return TSI_UNIMPLEMENTED; return self->vtable->extract_peer(self, peer); } tsi_result tsi_handshaker_result_create_frame_protector( const tsi_handshaker_result *self, size_t *max_protected_frame_size, tsi_frame_protector **protector) { - if (self == NULL || protector == NULL) return TSI_INVALID_ARGUMENT; - if (self->vtable == NULL || self->vtable->create_frame_protector == NULL) { - return TSI_UNIMPLEMENTED; + if (self == NULL || self->vtable == NULL || protector == NULL) { + return TSI_INVALID_ARGUMENT; } + if (self->vtable->create_frame_protector == NULL) return TSI_UNIMPLEMENTED; return self->vtable->create_frame_protector(self, max_protected_frame_size, protector); } @@ -242,12 +228,11 @@ tsi_result tsi_handshaker_result_create_frame_protector( tsi_result tsi_handshaker_result_get_unused_bytes( const tsi_handshaker_result *self, const unsigned char **bytes, size_t *bytes_size) { - if (self == NULL || bytes == NULL || bytes_size == NULL) { + if (self == NULL || self->vtable == NULL || bytes == NULL || + bytes_size == NULL) { return TSI_INVALID_ARGUMENT; } - if (self->vtable == NULL || self->vtable->get_unused_bytes == NULL) { - return TSI_UNIMPLEMENTED; - } + if (self->vtable->get_unused_bytes == NULL) return TSI_UNIMPLEMENTED; return self->vtable->get_unused_bytes(self, bytes, bytes_size); } diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h index 2c7db6bca9..b0d7039850 100644 --- a/src/core/tsi/transport_security.h +++ b/src/core/tsi/transport_security.h @@ -70,7 +70,8 @@ typedef struct { tsi_frame_protector **protector); void (*destroy)(tsi_handshaker *self); tsi_result (*next)(tsi_handshaker *self, const unsigned char *received_bytes, - size_t received_bytes_size, unsigned char **bytes_to_send, + size_t received_bytes_size, + const unsigned char **bytes_to_send, size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, tsi_handshaker_on_next_done_cb cb, void *user_data); @@ -86,6 +87,10 @@ struct tsi_handshaker { See transport_security_interface.h for documentation. */ typedef struct { tsi_result (*extract_peer)(const tsi_handshaker_result *self, tsi_peer *peer); + tsi_result (*create_zero_copy_grpc_protector)( + const tsi_handshaker_result *self, + size_t *max_output_protected_frame_size, + tsi_zero_copy_grpc_protector **protector); tsi_result (*create_frame_protector)(const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, tsi_frame_protector **protector); diff --git a/src/core/tsi/transport_security_adapter.c b/src/core/tsi/transport_security_adapter.c index b6dc660c47..1c2a57b3bd 100644 --- a/src/core/tsi/transport_security_adapter.c +++ b/src/core/tsi/transport_security_adapter.c @@ -66,8 +66,11 @@ static void adapter_result_destroy(tsi_handshaker_result *self) { } static const tsi_handshaker_result_vtable result_vtable = { - adapter_result_extract_peer, adapter_result_create_frame_protector, - adapter_result_get_unused_bytes, adapter_result_destroy, + adapter_result_extract_peer, + NULL, /* create_zero_copy_grpc_protector */ + adapter_result_create_frame_protector, + adapter_result_get_unused_bytes, + adapter_result_destroy, }; /* Ownership of wrapped tsi_handshaker is transferred to the result object. */ @@ -140,7 +143,7 @@ static void adapter_destroy(tsi_handshaker *self) { static tsi_result adapter_next( tsi_handshaker *self, const unsigned char *received_bytes, - size_t received_bytes_size, unsigned char **bytes_to_send, + size_t received_bytes_size, const unsigned char **bytes_to_send, size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, tsi_handshaker_on_next_done_cb cb, void *user_data) { /* Input sanity check. */ diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c new file mode 100644 index 0000000000..5bcfdfa61f --- /dev/null +++ b/src/core/tsi/transport_security_grpc.c @@ -0,0 +1,64 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/tsi/transport_security_grpc.h" + +/* This method creates a tsi_zero_copy_grpc_protector object. */ +tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( + const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + tsi_zero_copy_grpc_protector **protector) { + if (self == NULL || self->vtable == NULL || protector == NULL) { + return TSI_INVALID_ARGUMENT; + } + if (self->vtable->create_zero_copy_grpc_protector == NULL) { + return TSI_UNIMPLEMENTED; + } + return self->vtable->create_zero_copy_grpc_protector( + self, max_output_protected_frame_size, protector); +} + +/* --- tsi_zero_copy_grpc_protector common implementation. --- + + Calls specific implementation after state/input validation. */ + +tsi_result tsi_zero_copy_grpc_protector_protect( + tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, + grpc_slice_buffer *protected_slices) { + if (self == NULL || self->vtable == NULL || unprotected_slices == NULL || + protected_slices == NULL) { + return TSI_INVALID_ARGUMENT; + } + if (self->vtable->protect == NULL) return TSI_UNIMPLEMENTED; + return self->vtable->protect(self, unprotected_slices, protected_slices); +} + +tsi_result tsi_zero_copy_grpc_protector_unprotect( + tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, + grpc_slice_buffer *unprotected_slices) { + if (self == NULL || self->vtable == NULL || protected_slices == NULL || + unprotected_slices == NULL) { + return TSI_INVALID_ARGUMENT; + } + if (self->vtable->unprotect == NULL) return TSI_UNIMPLEMENTED; + return self->vtable->unprotect(self, protected_slices, unprotected_slices); +} + +void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self) { + if (self == NULL) return; + self->vtable->destroy(self); +} diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h new file mode 100644 index 0000000000..5ab5297cc4 --- /dev/null +++ b/src/core/tsi/transport_security_grpc.h @@ -0,0 +1,80 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_TSI_TRANSPORT_SECURITY_GRPC_H +#define GRPC_CORE_TSI_TRANSPORT_SECURITY_GRPC_H + +#include <grpc/slice_buffer.h> +#include "src/core/tsi/transport_security.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* This method creates a tsi_zero_copy_grpc_protector object. It return TSI_OK + assuming there is no fatal error. + The caller is responsible for destroying the protector. */ +tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( + const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + tsi_zero_copy_grpc_protector **protector); + +/* -- tsi_zero_copy_grpc_protector object -- */ + +/* Outputs protected frames. + - unprotected_slices is the unprotected data to be protected. + - protected_slices is the protected output frames. One or more frames + may be produced in this protect function. + - This method returns TSI_OK in case of success or a specific error code in + case of failure. */ +tsi_result tsi_zero_copy_grpc_protector_protect( + tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, + grpc_slice_buffer *protected_slices); + +/* Outputs unprotected bytes. + - protected_slices is the bytes of protected frames. + - unprotected_slices is the unprotected output data. + - This method returns TSI_OK in case of success. Success includes cases where + there is not enough data to output in which case unprotected_slices has 0 + bytes. */ +tsi_result tsi_zero_copy_grpc_protector_unprotect( + tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, + grpc_slice_buffer *unprotected_slices); + +/* Destroys the tsi_zero_copy_grpc_protector object. */ +void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self); + +/* Base for tsi_zero_copy_grpc_protector implementations. */ +typedef struct { + tsi_result (*protect)(tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *unprotected_slices, + grpc_slice_buffer *protected_slices); + tsi_result (*unprotect)(tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *protected_slices, + grpc_slice_buffer *unprotected_slices); + void (*destroy)(tsi_zero_copy_grpc_protector *self); +} tsi_zero_copy_grpc_protector_vtable; + +struct tsi_zero_copy_grpc_protector { + const tsi_zero_copy_grpc_protector_vtable *vtable; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_TSI_TRANSPORT_SECURITY_GRPC_H */ diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h index 39ba8addc4..80c426bbdb 100644 --- a/src/core/tsi/transport_security_interface.h +++ b/src/core/tsi/transport_security_interface.h @@ -62,6 +62,15 @@ const char *tsi_result_to_string(tsi_result result); extern grpc_tracer_flag tsi_tracing_enabled; +/* -- tsi_zero_copy_grpc_protector object -- + + This object protects and unprotects grpc slice buffers with zero or minimized + memory copy once the handshake is done. Implementations of this object must be + thread compatible. This object depends on grpc and the details of this object + is defined in transport_security_grpc.h. */ + +typedef struct tsi_zero_copy_grpc_protector tsi_zero_copy_grpc_protector; + /* --- tsi_frame_protector object --- This object protects and unprotects buffers once the handshake is done. @@ -429,7 +438,7 @@ typedef void (*tsi_handshaker_on_next_done_cb)( tsi_handshaker object. */ tsi_result tsi_handshaker_next( tsi_handshaker *self, const unsigned char *received_bytes, - size_t received_bytes_size, unsigned char **bytes_to_send, + size_t received_bytes_size, const unsigned char **bytes_to_send, size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, tsi_handshaker_on_next_done_cb cb, void *user_data); diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 0f3a82c605..fc9d5599f2 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -18,6 +18,7 @@ using System; using System.Linq; +using System.Threading; using Grpc.Core; using NUnit.Framework; @@ -75,5 +76,19 @@ namespace Grpc.Core.Tests var parts = coreVersion.Split('.'); Assert.AreEqual(3, parts.Length); } + + [Test] + public void ShuttingDownEventIsFired() + { + var cts = new CancellationTokenSource(); + var handler = new EventHandler((sender, args) => { cts.Cancel(); }); + + GrpcEnvironment.ShuttingDown += handler; + var env = GrpcEnvironment.AddRef(); + GrpcEnvironment.ReleaseAsync().Wait(); + GrpcEnvironment.ShuttingDown -= handler; + + Assert.IsTrue(cts.Token.IsCancellationRequested); + } } } diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 0663ee9215..cbc7d2078c 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -49,7 +49,7 @@ namespace Grpc.Core readonly DebugStats debugStats = new DebugStats(); readonly AtomicCounter cqPickerCounter = new AtomicCounter(); - bool isClosed; + bool isShutdown; /// <summary> /// Returns a reference-counted instance of initialized gRPC environment. @@ -238,6 +238,12 @@ namespace Grpc.Core } /// <summary> + /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic. + /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first). + /// </summary> + public static event EventHandler ShuttingDown; + + /// <summary> /// Creates gRPC environment. /// </summary> private GrpcEnvironment() @@ -311,13 +317,16 @@ namespace Grpc.Core /// </summary> private async Task ShutdownAsync() { - if (isClosed) + if (isShutdown) { - throw new InvalidOperationException("Close has already been called"); + throw new InvalidOperationException("ShutdownAsync has already been called"); } + + await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false); + await threadPool.StopAsync().ConfigureAwait(false); GrpcNativeShutdown(); - isClosed = true; + isShutdown = true; debugStats.CheckOK(); } diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 71e6904008..26095a78f9 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -260,7 +260,10 @@ class SendClientCloseOp : public Op { class SendServerStatusOp : public Op { public: - SendServerStatusOp() { grpc_metadata_array_init(&status_metadata); } + SendServerStatusOp() { + details = grpc_empty_slice(); + grpc_metadata_array_init(&status_metadata); + } ~SendServerStatusOp() { grpc_slice_unref(details); DestroyMetadataArray(&status_metadata); @@ -381,7 +384,10 @@ class ReadMessageOp : public Op { class ClientStatusOp : public Op { public: - ClientStatusOp() { grpc_metadata_array_init(&metadata_array); } + ClientStatusOp() { + grpc_metadata_array_init(&metadata_array); + status_details = grpc_empty_slice(); + } ~ClientStatusOp() { grpc_metadata_array_destroy(&metadata_array); diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index aebd298e32..b5246c4f31 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -188,6 +188,103 @@ describe('call', function() { }, TypeError); }); }); + describe('startBatch with message', function() { + it('should fail with null argument', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_MESSAGE] = null; + call.startBatch(batch, function(){}); + }, TypeError); + }); + it('should fail with numeric argument', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_MESSAGE] = 5; + call.startBatch(batch, function(){}); + }, TypeError); + }); + it('should fail with string argument', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_MESSAGE] = 'value'; + call.startBatch(batch, function(){}); + }, TypeError); + }); + }); + describe('startBatch with status', function() { + it('should fail without a code', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + details: 'details string', + metadata: {} + }; + call.startBatch(batch, function(){}); + }, TypeError); + }); + it('should fail without details', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: 0, + metadata: {} + }; + call.startBatch(batch, function(){}); + }, TypeError); + }); + it('should fail without metadata', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: 0, + details: 'details string' + }; + call.startBatch(batch, function(){}); + }, TypeError); + }); + it('should fail with incorrectly typed code argument', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: 'code string', + details: 'details string', + metadata: {} + }; + call.startBatch(batch, function(){}); + }, TypeError); + }); + it('should fail with incorrectly typed details argument', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: 0, + details: 5, + metadata: {} + }; + call.startBatch(batch, function(){}); + }, TypeError); + }); + it('should fail with incorrectly typed metadata argument', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + var batch = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: 0, + details: 'details string', + metadata: 'abc' + }; + call.startBatch(batch, function(){}); + }, TypeError); + }); + }); describe('cancel', function() { it('should succeed', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index e52d43e81d..dc4d28f95b 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -246,6 +246,7 @@ CORE_SOURCE_FILES = [ 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/gts_transport_security.c', 'src/core/tsi/ssl_transport_security.c', + 'src/core/tsi/transport_security_grpc.c', 'src/core/tsi/transport_security.c', 'src/core/tsi/transport_security_adapter.c', 'src/core/ext/transport/chttp2/server/chttp2_server.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index fd1a13563a..4901fc9218 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1421,6 +1421,8 @@ src/core/tsi/transport_security.c \ src/core/tsi/transport_security.h \ src/core/tsi/transport_security_adapter.c \ src/core/tsi/transport_security_adapter.h \ +src/core/tsi/transport_security_grpc.c \ +src/core/tsi/transport_security_grpc.h \ src/core/tsi/transport_security_interface.h \ third_party/nanopb/pb_common.c \ third_party/nanopb/pb_decode.c \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 41a299dd62..30fef6fc0a 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -9095,16 +9095,15 @@ "deps": [ "gpr", "grpc_base", - "grpc_trace" + "grpc_trace", + "tsi_interface" ], "headers": [ "src/core/tsi/fake_transport_security.h", "src/core/tsi/gts_transport_security.h", "src/core/tsi/ssl_transport_security.h", "src/core/tsi/ssl_types.h", - "src/core/tsi/transport_security.h", - "src/core/tsi/transport_security_adapter.h", - "src/core/tsi/transport_security_interface.h" + "src/core/tsi/transport_security_grpc.h" ], "is_filegroup": true, "language": "c", @@ -9117,6 +9116,26 @@ "src/core/tsi/ssl_transport_security.c", "src/core/tsi/ssl_transport_security.h", "src/core/tsi/ssl_types.h", + "src/core/tsi/transport_security_grpc.c", + "src/core/tsi/transport_security_grpc.h" + ], + "third_party": false, + "type": "filegroup" + }, + { + "deps": [ + "gpr", + "grpc_trace" + ], + "headers": [ + "src/core/tsi/transport_security.h", + "src/core/tsi/transport_security_adapter.h", + "src/core/tsi/transport_security_interface.h" + ], + "is_filegroup": true, + "language": "c", + "name": "tsi_interface", + "src": [ "src/core/tsi/transport_security.c", "src/core/tsi/transport_security.h", "src/core/tsi/transport_security_adapter.c", diff --git a/tools/run_tests/run_build_statistics.py b/tools/run_tests/run_build_statistics.py index d63dc3e86f..0ac6fc50aa 100755 --- a/tools/run_tests/run_build_statistics.py +++ b/tools/run_tests/run_build_statistics.py @@ -152,16 +152,13 @@ def _process_build(json_url, console_url): failure_count = test_result['failCount'] build_result['pass_count'] = test_result['passCount'] build_result['failure_count'] = failure_count + # This means Jenkins failure occurred. build_result['no_report_files_found'] = _no_report_files_found(html) - if failure_count > 0: + # Only check errors if Jenkins failure occurred. + if build_result['no_report_files_found']: error_list, known_error_count = _scrape_for_known_errors(html) - unknown_error_count = failure_count - known_error_count - # This can happen if the same error occurs multiple times in one test. - if failure_count < known_error_count: - print('====> Some errors are duplicates.') - unknown_error_count = 0 - error_list.append({'description': _UNKNOWN_ERROR, - 'count': unknown_error_count}) + if not error_list: + error_list.append({'description': _UNKNOWN_ERROR, 'count': 1}) except Exception as e: print('====> Got exception for %s: %s.' % (json_url, str(e))) print('====> Parsing errors from %s.' % console_url) @@ -176,6 +173,8 @@ def _process_build(json_url, console_url): if error_list: build_result['error'] = error_list + else: + build_result['error'] = [{'description': '', 'count': 0}] return build_result diff --git a/tools/run_tests/sanity/core_untyped_structs.sh b/tools/run_tests/sanity/core_untyped_structs.sh new file mode 100755 index 0000000000..792dd68fdd --- /dev/null +++ b/tools/run_tests/sanity/core_untyped_structs.sh @@ -0,0 +1,27 @@ +#!/bin/sh +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +cd `dirname $0`/../../.. + +# +# Make sure that all core struct/unions have a name or are typedef'ed +# + +egrep -Irn '(struct|union) *{' include/grpc | + egrep -v typedef | + diff - /dev/null + diff --git a/tools/run_tests/sanity/sanity_tests.yaml b/tools/run_tests/sanity/sanity_tests.yaml index a86ebee7b4..7e582bc40b 100644 --- a/tools/run_tests/sanity/sanity_tests.yaml +++ b/tools/run_tests/sanity/sanity_tests.yaml @@ -6,6 +6,7 @@ - script: tools/run_tests/sanity/check_test_filtering.py - script: tools/run_tests/sanity/check_tracer_sanity.py - script: tools/run_tests/sanity/core_banned_functions.py +- script: tools/run_tests/sanity/core_untyped_structs.sh - script: tools/buildgen/generate_projects.sh -j 3 cpu_cost: 3 - script: tools/distrib/check_copyright.py diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 3b18b42ce9..8a659280a4 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -347,6 +347,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\tsi\gts_transport_security.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_types.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_interface.h" /> @@ -892,6 +893,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_adapter.c"> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 546f8a57a8..6879047f55 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -556,6 +556,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\tsi\ssl_transport_security.c"> <Filter>src\core\tsi</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.c"> + <Filter>src\core\tsi</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\tsi\transport_security.c"> <Filter>src\core\tsi</Filter> </ClCompile> @@ -1010,6 +1013,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\tsi\ssl_types.h"> <Filter>src\core\tsi</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security_grpc.h"> + <Filter>src\core\tsi</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\tsi\transport_security.h"> <Filter>src\core\tsi</Filter> </ClInclude> |