aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-06-06 13:26:36 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2017-06-06 13:26:36 -0700
commitb89b5011ecfe5135e03e98fb058e0ad345747007 (patch)
treec49d57c3c752150148840b672474ccfeb81d6294 /src
parent4ebace71b071d9dd38a089c88b737b52fe57dfd5 (diff)
parentb1332047d049d788a59284a561f4d6c2c2488792 (diff)
Merge remote-tracking branch 'upstream/master' into srv_record
Diffstat (limited to 'src')
-rw-r--r--src/compiler/php_generator.cc21
-rw-r--r--src/core/ext/census/intrusive_hash_map.c320
-rw-r--r--src/core/ext/census/intrusive_hash_map.h167
-rw-r--r--src/core/ext/census/intrusive_hash_map_internal.h63
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c77
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c115
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c459
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c1
-rw-r--r--src/core/lib/http/httpcli.c4
-rw-r--r--src/core/lib/http/httpcli_security_connector.c4
-rw-r--r--src/core/lib/security/transport/security_connector.c19
-rw-r--r--src/core/lib/security/transport/security_handshaker.c220
-rw-r--r--src/cpp/common/version_cc.cc2
-rw-r--r--src/cpp/server/server_builder.cc13
-rw-r--r--src/csharp/.editorconfig7
-rwxr-xr-xsrc/csharp/Grpc.Auth/Grpc.Auth.csproj7
-rwxr-xr-xsrc/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj11
-rw-r--r--src/csharp/Grpc.Core/Channel.cs41
-rwxr-xr-xsrc/csharp/Grpc.Core/Common.csproj.include4
-rwxr-xr-xsrc/csharp/Grpc.Core/Grpc.Core.csproj5
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs11
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionRegistry.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs4
-rw-r--r--src/csharp/Grpc.Core/Properties/AssemblyInfo.cs6
-rwxr-xr-xsrc/csharp/Grpc.Core/Version.csproj.include2
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs4
-rwxr-xr-xsrc/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj7
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs32
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Histogram.cs57
-rw-r--r--src/csharp/Grpc.IntegrationTesting/HistogramTest.cs26
-rw-r--r--src/csharp/Grpc.IntegrationTesting/QpsWorker.cs4
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ServerRunners.cs4
-rw-r--r--src/csharp/Grpc.Microbenchmarks/.gitignore2
-rw-r--r--src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj28
-rw-r--r--src/csharp/Grpc.Microbenchmarks/Program.cs55
-rw-r--r--src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs106
-rw-r--r--src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs79
-rwxr-xr-xsrc/csharp/Grpc.Reflection/Grpc.Reflection.csproj7
-rw-r--r--src/csharp/Grpc.sln8
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.bat12
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.sh16
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c86
-rw-r--r--src/node/README.md64
-rw-r--r--src/node/ext/call.cc37
-rw-r--r--src/node/ext/call.h1
-rw-r--r--src/node/health_check/package.json4
-rw-r--r--src/node/index.js138
-rw-r--r--src/node/performance/benchmark_client.js42
-rw-r--r--src/node/src/client.js266
-rw-r--r--src/node/src/common.js64
-rw-r--r--src/node/src/constants.js24
-rw-r--r--src/node/src/credentials.js63
-rw-r--r--src/node/src/grpc_extension.js4
-rw-r--r--src/node/src/metadata.js15
-rw-r--r--src/node/src/protobuf_js_5_common.js9
-rw-r--r--src/node/src/protobuf_js_6_common.js9
-rw-r--r--src/node/src/server.js445
-rw-r--r--src/node/test/common_test.js1
-rw-r--r--src/node/test/surface_test.js33
-rw-r--r--src/node/tools/package.json2
-rw-r--r--src/objective-c/!ProtoCompiler-gRPCPlugin.podspec2
-rw-r--r--src/objective-c/GRPCClient/private/version.h2
-rw-r--r--src/php/composer.json2
-rw-r--r--src/php/ext/grpc/version.h2
-rw-r--r--src/python/grpcio/grpc/_channel.py7
-rw-r--r--src/python/grpcio/grpc/_grpcio_metadata.py2
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--src/python/grpcio/grpc_version.py2
-rw-r--r--src/python/grpcio_health_checking/grpc_version.py2
-rw-r--r--src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py5
-rw-r--r--src/python/grpcio_reflection/grpc_version.py2
-rw-r--r--src/python/grpcio_tests/grpc_version.py2
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
-rw-r--r--src/python/grpcio_tests/tests/unit/_reconnect_test.py70
-rwxr-xr-xsrc/ruby/end2end/channel_closing_driver.rb5
-rwxr-xr-xsrc/ruby/end2end/channel_state_driver.rb3
-rwxr-xr-xsrc/ruby/end2end/grpc_class_init_client.rb96
-rwxr-xr-xsrc/ruby/end2end/grpc_class_init_driver.rb53
-rwxr-xr-xsrc/ruby/end2end/multiple_killed_watching_threads_driver.rb63
-rwxr-xr-xsrc/ruby/end2end/sig_int_during_channel_watch_client.rb2
-rwxr-xr-xsrc/ruby/end2end/sig_int_during_channel_watch_driver.rb5
-rw-r--r--src/ruby/ext/grpc/rb_channel.c474
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.c12
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c20
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c4
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h6
-rw-r--r--src/ruby/lib/grpc/grpc.rb2
-rw-r--r--src/ruby/lib/grpc/version.rb2
-rw-r--r--src/ruby/spec/channel_connection_spec.rb34
-rw-r--r--src/ruby/tools/version.rb2
91 files changed, 3018 insertions, 1220 deletions
diff --git a/src/compiler/php_generator.cc b/src/compiler/php_generator.cc
index 7d51d40301..67c4c80a7b 100644
--- a/src/compiler/php_generator.cc
+++ b/src/compiler/php_generator.cc
@@ -67,12 +67,11 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) {
vars["input_type_id"] = MessageIdentifierName(input_type->full_name());
vars["output_type_id"] = MessageIdentifierName(output_type->full_name());
- out->Print("/**\n");
- out->Print(GetPHPComments(method, " *").c_str());
+ out->Print(GetPHPComments(method, " //").c_str());
if (method->client_streaming()) {
out->Print(vars,
- " * @param array $$metadata metadata\n"
- " * @param array $$options call options\n */\n"
+ " // @param array $$metadata metadata\n"
+ " // @param array $$options call options\n"
"public function $name$($$metadata = [], "
"$$options = []) {\n");
out->Indent();
@@ -87,9 +86,9 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) {
"$$metadata, $$options);\n");
} else {
out->Print(vars,
- " * @param \\$input_type_id$ $$argument input argument\n"
- " * @param array $$metadata metadata\n"
- " * @param array $$options call options\n */\n"
+ " // @param \\$input_type_id$ $$argument input argument\n"
+ " // @param array $$metadata metadata\n"
+ " // @param array $$options call options\n"
"public function $name$(\\$input_type_id$ $$argument,\n"
" $$metadata = [], $$options = []) {\n");
out->Indent();
@@ -116,10 +115,10 @@ void PrintService(const ServiceDescriptor *service, Printer *out) {
out->Print(vars, "class $name$Client extends \\Grpc\\BaseStub {\n\n");
out->Indent();
out->Print(
- "/**\n * @param string $$hostname hostname\n"
- " * @param array $$opts channel options\n"
- " * @param \\Grpc\\Channel $$channel (optional) re-use channel "
- "object\n */\n"
+ " // @param string $$hostname hostname\n"
+ " // @param array $$opts channel options\n"
+ " // @param \\Grpc\\Channel $$channel (optional) re-use channel "
+ "object\n"
"public function __construct($$hostname, $$opts, "
"$$channel = null) {\n");
out->Indent();
diff --git a/src/core/ext/census/intrusive_hash_map.c b/src/core/ext/census/intrusive_hash_map.c
new file mode 100644
index 0000000000..9f56b765e1
--- /dev/null
+++ b/src/core/ext/census/intrusive_hash_map.c
@@ -0,0 +1,320 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/intrusive_hash_map.h"
+#include <string.h>
+
+extern bool hm_index_compare(const hm_index *A, const hm_index *B);
+
+/* Simple hashing function that takes lower 32 bits. */
+static __inline uint32_t chunked_vector_hasher(uint64_t key) {
+ return (uint32_t)key;
+}
+
+/* Vector chunks are 1MiB divided by pointer size. */
+static const size_t VECTOR_CHUNK_SIZE = (1 << 20) / sizeof(void *);
+
+/* Helper functions which return buckets from the chunked vector. */
+static __inline void **get_mutable_bucket(const chunked_vector *buckets,
+ uint32_t index) {
+ if (index < VECTOR_CHUNK_SIZE) {
+ return &buckets->first_[index];
+ }
+ size_t rest_index = (index - VECTOR_CHUNK_SIZE) / VECTOR_CHUNK_SIZE;
+ return &buckets->rest_[rest_index][index % VECTOR_CHUNK_SIZE];
+}
+
+static __inline void *get_bucket(const chunked_vector *buckets,
+ uint32_t index) {
+ if (index < VECTOR_CHUNK_SIZE) {
+ return buckets->first_[index];
+ }
+ size_t rest_index = (index - VECTOR_CHUNK_SIZE) / VECTOR_CHUNK_SIZE;
+ return buckets->rest_[rest_index][index % VECTOR_CHUNK_SIZE];
+}
+
+/* Helper function. */
+static __inline size_t RestSize(const chunked_vector *vec) {
+ return (vec->size_ <= VECTOR_CHUNK_SIZE)
+ ? 0
+ : (vec->size_ - VECTOR_CHUNK_SIZE - 1) / VECTOR_CHUNK_SIZE + 1;
+}
+
+/* Initialize chunked vector to size of 0. */
+static void chunked_vector_init(chunked_vector *vec) {
+ vec->size_ = 0;
+ vec->first_ = NULL;
+ vec->rest_ = NULL;
+}
+
+/* Clear chunked vector and free all memory that has been allocated then
+ initialize chunked vector. */
+static void chunked_vector_clear(chunked_vector *vec) {
+ if (vec->first_ != NULL) {
+ gpr_free(vec->first_);
+ }
+ if (vec->rest_ != NULL) {
+ size_t rest_size = RestSize(vec);
+ for (size_t i = 0; i < rest_size; ++i) {
+ if (vec->rest_[i] != NULL) {
+ gpr_free(vec->rest_[i]);
+ }
+ }
+ gpr_free(vec->rest_);
+ }
+ chunked_vector_init(vec);
+}
+
+/* Clear chunked vector and then resize it to n entries. Allow the first 1MB to
+ be read w/o an extra cache miss. The rest of the elements are stored in an
+ array of arrays to avoid large mallocs. */
+static void chunked_vector_reset(chunked_vector *vec, size_t n) {
+ chunked_vector_clear(vec);
+ vec->size_ = n;
+ if (n <= VECTOR_CHUNK_SIZE) {
+ vec->first_ = (void **)gpr_malloc(sizeof(void *) * n);
+ memset(vec->first_, 0, sizeof(void *) * n);
+ } else {
+ vec->first_ = (void **)gpr_malloc(sizeof(void *) * VECTOR_CHUNK_SIZE);
+ memset(vec->first_, 0, sizeof(void *) * VECTOR_CHUNK_SIZE);
+ size_t rest_size = RestSize(vec);
+ vec->rest_ = (void ***)gpr_malloc(sizeof(void **) * rest_size);
+ memset(vec->rest_, 0, sizeof(void **) * rest_size);
+ int i = 0;
+ n -= VECTOR_CHUNK_SIZE;
+ while (n > 0) {
+ size_t this_size = GPR_MIN(n, VECTOR_CHUNK_SIZE);
+ vec->rest_[i] = (void **)gpr_malloc(sizeof(void *) * this_size);
+ memset(vec->rest_[i], 0, sizeof(void *) * this_size);
+ n -= this_size;
+ ++i;
+ }
+ }
+}
+
+void intrusive_hash_map_init(intrusive_hash_map *hash_map,
+ uint32_t initial_log2_table_size) {
+ hash_map->log2_num_buckets = initial_log2_table_size;
+ hash_map->num_items = 0;
+ uint32_t num_buckets = (uint32_t)1 << hash_map->log2_num_buckets;
+ hash_map->extend_threshold = num_buckets >> 1;
+ chunked_vector_init(&hash_map->buckets);
+ chunked_vector_reset(&hash_map->buckets, num_buckets);
+ hash_map->hash_mask = num_buckets - 1;
+}
+
+bool intrusive_hash_map_empty(const intrusive_hash_map *hash_map) {
+ return hash_map->num_items == 0;
+}
+
+size_t intrusive_hash_map_size(const intrusive_hash_map *hash_map) {
+ return hash_map->num_items;
+}
+
+void intrusive_hash_map_end(const intrusive_hash_map *hash_map, hm_index *idx) {
+ idx->bucket_index = (uint32_t)hash_map->buckets.size_;
+ GPR_ASSERT(idx->bucket_index <= UINT32_MAX);
+ idx->item = NULL;
+}
+
+void intrusive_hash_map_next(const intrusive_hash_map *hash_map,
+ hm_index *idx) {
+ idx->item = idx->item->hash_link;
+ while (idx->item == NULL) {
+ idx->bucket_index++;
+ if (idx->bucket_index >= hash_map->buckets.size_) {
+ /* Reached end of table. */
+ idx->item = NULL;
+ return;
+ }
+ idx->item = (hm_item *)get_bucket(&hash_map->buckets, idx->bucket_index);
+ }
+}
+
+void intrusive_hash_map_begin(const intrusive_hash_map *hash_map,
+ hm_index *idx) {
+ for (uint32_t i = 0; i < hash_map->buckets.size_; ++i) {
+ if (get_bucket(&hash_map->buckets, i) != NULL) {
+ idx->bucket_index = i;
+ idx->item = (hm_item *)get_bucket(&hash_map->buckets, i);
+ return;
+ }
+ }
+ intrusive_hash_map_end(hash_map, idx);
+}
+
+hm_item *intrusive_hash_map_find(const intrusive_hash_map *hash_map,
+ uint64_t key) {
+ uint32_t index = chunked_vector_hasher(key) & hash_map->hash_mask;
+
+ hm_item *p = (hm_item *)get_bucket(&hash_map->buckets, index);
+ while (p != NULL) {
+ if (key == p->key) {
+ return p;
+ }
+ p = p->hash_link;
+ }
+ return NULL;
+}
+
+hm_item *intrusive_hash_map_erase(intrusive_hash_map *hash_map, uint64_t key) {
+ uint32_t index = chunked_vector_hasher(key) & hash_map->hash_mask;
+
+ hm_item **slot = (hm_item **)get_mutable_bucket(&hash_map->buckets, index);
+ hm_item *p = *slot;
+ if (p == NULL) {
+ return NULL;
+ }
+
+ if (key == p->key) {
+ *slot = p->hash_link;
+ p->hash_link = NULL;
+ hash_map->num_items--;
+ return p;
+ }
+
+ hm_item *prev = p;
+ p = p->hash_link;
+
+ while (p) {
+ if (key == p->key) {
+ prev->hash_link = p->hash_link;
+ p->hash_link = NULL;
+ hash_map->num_items--;
+ return p;
+ }
+ prev = p;
+ p = p->hash_link;
+ }
+ return NULL;
+}
+
+/* Insert an hm_item* into the underlying chunked vector. hash_mask is
+ * array_size-1. Returns true if it is a new hm_item and false if the hm_item
+ * already existed.
+ */
+static __inline bool intrusive_hash_map_internal_insert(chunked_vector *buckets,
+ uint32_t hash_mask,
+ hm_item *item) {
+ const uint64_t key = item->key;
+ uint32_t index = chunked_vector_hasher(key) & hash_mask;
+ hm_item **slot = (hm_item **)get_mutable_bucket(buckets, index);
+ hm_item *p = *slot;
+ item->hash_link = p;
+
+ /* Check to see if key already exists. */
+ while (p) {
+ if (p->key == key) {
+ return false;
+ }
+ p = p->hash_link;
+ }
+
+ /* Otherwise add new entry. */
+ *slot = item;
+ return true;
+}
+
+/* Extend the allocated number of elements in the hash map by a factor of 2. */
+void intrusive_hash_map_extend(intrusive_hash_map *hash_map) {
+ uint32_t new_log2_num_buckets = 1 + hash_map->log2_num_buckets;
+ uint32_t new_num_buckets = (uint32_t)1 << new_log2_num_buckets;
+ GPR_ASSERT(new_num_buckets <= UINT32_MAX && new_num_buckets > 0);
+ chunked_vector new_buckets;
+ chunked_vector_init(&new_buckets);
+ chunked_vector_reset(&new_buckets, new_num_buckets);
+ uint32_t new_hash_mask = new_num_buckets - 1;
+
+ hm_index cur_idx;
+ hm_index end_idx;
+ intrusive_hash_map_end(hash_map, &end_idx);
+ intrusive_hash_map_begin(hash_map, &cur_idx);
+ while (!hm_index_compare(&cur_idx, &end_idx)) {
+ hm_item *new_item = cur_idx.item;
+ intrusive_hash_map_next(hash_map, &cur_idx);
+ intrusive_hash_map_internal_insert(&new_buckets, new_hash_mask, new_item);
+ }
+
+ /* Set values for new chunked_vector. extend_threshold is set to half of
+ * new_num_buckets. */
+ hash_map->log2_num_buckets = new_log2_num_buckets;
+ chunked_vector_clear(&hash_map->buckets);
+ hash_map->buckets = new_buckets;
+ hash_map->hash_mask = new_hash_mask;
+ hash_map->extend_threshold = new_num_buckets >> 1;
+}
+
+/* Insert a hm_item. The hm_item must remain live until it is removed from the
+ table. This object does not take the ownership of hm_item. The caller must
+ remove this hm_item from the table and delete it before this table is
+ deleted. If hm_item exists already num_items is not changed. */
+bool intrusive_hash_map_insert(intrusive_hash_map *hash_map, hm_item *item) {
+ if (hash_map->num_items >= hash_map->extend_threshold) {
+ intrusive_hash_map_extend(hash_map);
+ }
+ if (intrusive_hash_map_internal_insert(&hash_map->buckets,
+ hash_map->hash_mask, item)) {
+ hash_map->num_items++;
+ return true;
+ }
+ return false;
+}
+
+void intrusive_hash_map_clear(intrusive_hash_map *hash_map,
+ void (*free_object)(void *)) {
+ hm_index cur;
+ hm_index end;
+ intrusive_hash_map_end(hash_map, &end);
+ intrusive_hash_map_begin(hash_map, &cur);
+
+ while (!hm_index_compare(&cur, &end)) {
+ hm_index next = cur;
+ intrusive_hash_map_next(hash_map, &next);
+ if (cur.item != NULL) {
+ hm_item *item = intrusive_hash_map_erase(hash_map, cur.item->key);
+ (*free_object)((void *)item);
+ gpr_free(item);
+ }
+ cur = next;
+ }
+}
+
+void intrusive_hash_map_free(intrusive_hash_map *hash_map,
+ void (*free_object)(void *)) {
+ intrusive_hash_map_clear(hash_map, (*free_object));
+ hash_map->num_items = 0;
+ hash_map->extend_threshold = 0;
+ hash_map->log2_num_buckets = 0;
+ hash_map->hash_mask = 0;
+ chunked_vector_clear(&hash_map->buckets);
+}
diff --git a/src/core/ext/census/intrusive_hash_map.h b/src/core/ext/census/intrusive_hash_map.h
new file mode 100644
index 0000000000..e316bf4b16
--- /dev/null
+++ b/src/core/ext/census/intrusive_hash_map.h
@@ -0,0 +1,167 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H
+#define GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H
+
+#include "src/core/ext/census/intrusive_hash_map_internal.h"
+
+/* intrusive_hash_map is a fast chained hash table. This hash map is faster than
+ * a dense hash map when the application calls insert and erase more often than
+ * find. When the workload is dominated by find() a dense hash map may be
+ * faster.
+ *
+ * intrusive_hash_map uses an intrusive header placed within a user defined
+ * struct. The header field IHM_key MUST be set to a valid value before
+ * insertion into the hash map or undefined behavior may occur. The header field
+ * IHM_hash_link MUST to be set to NULL initially.
+ *
+ * EXAMPLE USAGE:
+ *
+ * typedef struct string_item {
+ * INTRUSIVE_HASH_MAP_HEADER;
+ * // User data.
+ * char *str_buf;
+ * uint16_t len;
+ * } string_item;
+ *
+ * static string_item *make_string_item(uint64_t key, const char *buf,
+ * uint16_t len) {
+ * string_item *item = (string_item *)gpr_malloc(sizeof(string_item));
+ * item->IHM_key = key;
+ * item->IHM_hash_link = NULL;
+ * item->len = len;
+ * item->str_buf = (char *)malloc(len);
+ * memcpy(item->str_buf, buf, len);
+ * return item;
+ * }
+ *
+ * intrusive_hash_map hash_map;
+ * intrusive_hash_map_init(&hash_map, 4);
+ * string_item *new_item1 = make_string_item(10, "test1", 5);
+ * bool ok = intrusive_hash_map_insert(&hash_map, (hm_item *)new_item1);
+ *
+ * string_item *item1 =
+ * (string_item *)intrusive_hash_map_find(&hash_map, 10);
+ */
+
+/* Hash map item. Stores key and a pointer to the actual object. A user defined
+ * version of this can be passed in provided the first 2 entries (key and
+ * hash_link) are the same. These entries must be first in the user defined
+ * struct. Pointer to struct will need to be cast as (hm_item *) when passed to
+ * hash map. This allows it to be intrusive. */
+typedef struct hm_item {
+ uint64_t key;
+ struct hm_item *hash_link;
+ /* Optional user defined data after this. */
+} hm_item;
+
+/* Macro provided for ease of use. This must be first in the user defined
+ * struct (i.e. uint64_t key and hm_item * must be the first two elements in
+ * that order). */
+#define INTRUSIVE_HASH_MAP_HEADER \
+ uint64_t IHM_key; \
+ struct hm_item *IHM_hash_link
+
+/* Index struct which acts as a pseudo-iterator within the hash map. */
+typedef struct hm_index {
+ uint32_t bucket_index; // hash map bucket index.
+ hm_item *item; // Pointer to hm_item within the hash map.
+} hm_index;
+
+/* Returns true if two hm_indices point to the same object within the hash map
+ * and false otherwise. */
+__inline bool hm_index_compare(const hm_index *A, const hm_index *B) {
+ return (A->item == B->item && A->bucket_index == B->bucket_index);
+}
+
+/*
+ * Helper functions for iterating over the hash map.
+ */
+
+/* On return idx will contain an invalid index which is always equal to
+ * hash_map->buckets.size_ */
+void intrusive_hash_map_end(const intrusive_hash_map *hash_map, hm_index *idx);
+
+/* Iterates index to the next valid entry in the hash map and stores the
+ * index within idx. If end of table is reached, idx will contain the same
+ * values as if intrusive_hash_map_end() was called. */
+void intrusive_hash_map_next(const intrusive_hash_map *hash_map, hm_index *idx);
+
+/* On return, idx will contain the index of the first non-null entry in the hash
+ * map. If the hash map is empty, idx will contain the same values as if
+ * intrusive_hash_map_end() was called. */
+void intrusive_hash_map_begin(const intrusive_hash_map *hash_map,
+ hm_index *idx);
+
+/* Initialize intrusive hash map data structure. This must be called before
+ * the hash map can be used. The initial size of an intrusive hash map will be
+ * 2^initial_log2_map_size (valid range is [0, 31]). */
+void intrusive_hash_map_init(intrusive_hash_map *hash_map,
+ uint32_t initial_log2_map_size);
+
+/* Returns true if the hash map is empty and false otherwise. */
+bool intrusive_hash_map_empty(const intrusive_hash_map *hash_map);
+
+/* Returns the number of elements currently in the hash map. */
+size_t intrusive_hash_map_size(const intrusive_hash_map *hash_map);
+
+/* Find a hm_item within the hash map by key. Returns NULL if item was not
+ * found. */
+hm_item *intrusive_hash_map_find(const intrusive_hash_map *hash_map,
+ uint64_t key);
+
+/* Erase the hm_item that corresponds with key. If the hm_item is found, return
+ * the pointer to the hm_item. Else returns NULL. */
+hm_item *intrusive_hash_map_erase(intrusive_hash_map *hash_map, uint64_t key);
+
+/* Attempts to insert a new hm_item into the hash map. If an element with the
+ * same key already exists, it will not insert the new item and return false.
+ * Otherwise, it will insert the new item and return true. */
+bool intrusive_hash_map_insert(intrusive_hash_map *hash_map, hm_item *item);
+
+/* Clears entire contents of the hash map, but leaves internal data structure
+ * untouched. Second argument takes a function pointer to a function that will
+ * free the object designated by the user and pointed to by hash_map->value. */
+void intrusive_hash_map_clear(intrusive_hash_map *hash_map,
+ void (*free_object)(void *));
+
+/* Erase all contents of hash map and free the memory. Hash map is invalid
+ * after calling this function and cannot be used until it has been
+ * reinitialized (intrusive_hash_map_init()). This function takes a function
+ * pointer to a function that will free the object designated by the user and
+ * pointed to by hash_map->value. */
+void intrusive_hash_map_free(intrusive_hash_map *hash_map,
+ void (*free_object)(void *));
+
+#endif /* GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H */
diff --git a/src/core/ext/census/intrusive_hash_map_internal.h b/src/core/ext/census/intrusive_hash_map_internal.h
new file mode 100644
index 0000000000..76a9a3a722
--- /dev/null
+++ b/src/core/ext/census/intrusive_hash_map_internal.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H
+#define GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+#include <stdbool.h>
+
+/* The chunked vector is a data structure that allocates buckets for use in the
+ * hash map. ChunkedVector is logically equivalent to T*[N] (cast void* as
+ * T*). It's internally implemented as an array of 1MB arrays to avoid
+ * allocating large consecutive memory chunks. This is an internal data
+ * structure that should never be accessed directly. */
+typedef struct chunked_vector {
+ size_t size_;
+ void **first_;
+ void ***rest_;
+} chunked_vector;
+
+/* Core intrusive hash map data structure. All internal elements are managed by
+ * functions and should not be altered manually. */
+typedef struct intrusive_hash_map {
+ uint32_t num_items;
+ uint32_t extend_threshold;
+ uint32_t log2_num_buckets;
+ uint32_t hash_mask;
+ chunked_vector buckets;
+} intrusive_hash_map;
+
+#endif /* GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H */
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index f83670db82..04666edbec 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -67,9 +67,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
typedef enum {
WAITING,
- CALLING_BACK,
+ READY_TO_CALL_BACK,
CALLING_BACK_AND_FINISHED,
- CALLED_BACK
} callback_phase;
typedef struct {
@@ -77,11 +76,13 @@ typedef struct {
callback_phase phase;
grpc_closure on_complete;
grpc_closure on_timeout;
+ grpc_closure watcher_timer_init;
grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
grpc_cq_completion completion_storage;
grpc_channel *channel;
+ grpc_error *error;
void *tag;
} state_watcher;
@@ -105,11 +106,8 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
- case CALLED_BACK:
+ case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return );
- case CALLING_BACK:
- w->phase = CALLED_BACK;
- break;
case CALLING_BACK_AND_FINISHED:
delete = 1;
break;
@@ -123,10 +121,14 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
bool due_to_completion, grpc_error *error) {
- int delete = 0;
-
if (due_to_completion) {
grpc_timer_cancel(exec_ctx, &w->alarm);
+ } else {
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq), NULL,
+ &w->on_complete, NULL);
}
gpr_mu_lock(&w->mu);
@@ -147,25 +149,27 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
}
switch (w->phase) {
case WAITING:
- w->phase = CALLING_BACK;
- grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error),
- finished_completion, w, &w->completion_storage);
+ GRPC_ERROR_REF(error);
+ w->error = error;
+ w->phase = READY_TO_CALL_BACK;
break;
- case CALLING_BACK:
+ case READY_TO_CALL_BACK:
+ if (error != GRPC_ERROR_NONE) {
+ GPR_ASSERT(!due_to_completion);
+ GRPC_ERROR_UNREF(w->error);
+ GRPC_ERROR_REF(error);
+ w->error = error;
+ }
w->phase = CALLING_BACK_AND_FINISHED;
+ grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w,
+ &w->completion_storage);
break;
case CALLING_BACK_AND_FINISHED:
GPR_UNREACHABLE_CODE(return );
- case CALLED_BACK:
- delete = 1;
break;
}
gpr_mu_unlock(&w->mu);
- if (delete) {
- delete_state_watcher(exec_ctx, w);
- }
-
GRPC_ERROR_UNREF(error);
}
@@ -179,6 +183,28 @@ static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error));
}
+int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) {
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ return grpc_client_channel_num_external_connectivity_watchers(
+ client_channel_elem);
+}
+
+typedef struct watcher_timer_init_arg {
+ state_watcher *w;
+ gpr_timespec deadline;
+} watcher_timer_init_arg;
+
+static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_ignored) {
+ watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg;
+
+ grpc_timer_init(exec_ctx, &wa->w->alarm,
+ gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC),
+ &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_free(wa);
+}
+
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
@@ -208,16 +234,19 @@ void grpc_channel_watch_connectivity_state(
w->cq = cq;
w->tag = tag;
w->channel = channel;
+ w->error = NULL;
- grpc_timer_init(&exec_ctx, &w->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
+ watcher_timer_init_arg *wa = gpr_malloc(sizeof(watcher_timer_init_arg));
+ wa->w = w;
+ wa->deadline = deadline;
+ grpc_closure_init(&w->watcher_timer_init, watcher_timer_init, wa,
+ grpc_schedule_on_exec_ctx);
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
- grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
- grpc_cq_pollset(cq), &w->state,
- &w->on_complete);
+ grpc_client_channel_watch_connectivity_state(
+ &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state,
+ &w->on_complete, &w->watcher_timer_init);
} else {
abort();
}
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index f2f27b9175..8cebbe9eca 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -167,6 +167,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
return value;
}
+struct external_connectivity_watcher;
+
/*************************************************************************
* CHANNEL-WIDE FUNCTIONS
*/
@@ -204,6 +206,11 @@ typedef struct client_channel_channel_data {
/** interested parties (owned) */
grpc_pollset_set *interested_parties;
+ /* external_connectivity_watcher_list head is guarded by its own mutex, since
+ * counts need to be grabbed immediately without polling on a cq */
+ gpr_mu external_connectivity_watcher_list_mu;
+ struct external_connectivity_watcher *external_connectivity_watcher_list_head;
+
/* the following properties are guarded by a mutex since API's require them
to be instantaneously available */
gpr_mu info_mu;
@@ -661,6 +668,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
// Initialize data members.
chand->combiner = grpc_combiner_create(NULL);
gpr_mu_init(&chand->info_mu);
+ gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
+
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ chand->external_connectivity_watcher_list_head = NULL;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
on_resolver_result_changed_locked, chand,
@@ -749,6 +762,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
gpr_mu_destroy(&chand->info_mu);
+ gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
}
/*************************************************************************
@@ -1431,14 +1445,79 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
return out;
}
-typedef struct {
+typedef struct external_connectivity_watcher {
channel_data *chand;
grpc_pollset *pollset;
grpc_closure *on_complete;
+ grpc_closure *watcher_timer_init;
grpc_connectivity_state *state;
grpc_closure my_closure;
+ struct external_connectivity_watcher *next;
} external_connectivity_watcher;
+static external_connectivity_watcher *lookup_external_connectivity_watcher(
+ channel_data *chand, grpc_closure *on_complete) {
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL && w->on_complete != on_complete) {
+ w = w->next;
+ }
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return w;
+}
+
+static void external_connectivity_watcher_list_append(
+ channel_data *chand, external_connectivity_watcher *w) {
+ GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
+
+ gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
+ GPR_ASSERT(!w->next);
+ w->next = chand->external_connectivity_watcher_list_head;
+ chand->external_connectivity_watcher_list_head = w;
+ gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
+}
+
+static void external_connectivity_watcher_list_remove(
+ channel_data *chand, external_connectivity_watcher *too_remove) {
+ GPR_ASSERT(
+ lookup_external_connectivity_watcher(chand, too_remove->on_complete));
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ if (too_remove == chand->external_connectivity_watcher_list_head) {
+ chand->external_connectivity_watcher_list_head = too_remove->next;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return;
+ }
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL) {
+ if (w->next == too_remove) {
+ w->next = w->next->next;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return;
+ }
+ w = w->next;
+ }
+ GPR_UNREACHABLE_CODE(return );
+}
+
+int grpc_client_channel_num_external_connectivity_watchers(
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ int count = 0;
+
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL) {
+ count++;
+ w = w->next;
+ }
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+
+ return count;
+}
+
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
external_connectivity_watcher *w = arg;
@@ -1447,6 +1526,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
w->pollset);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
+ external_connectivity_watcher_list_remove(w->chand, w);
gpr_free(w);
grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
}
@@ -1454,21 +1534,42 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
external_connectivity_watcher *w = arg;
- grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
- grpc_schedule_on_exec_ctx);
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ external_connectivity_watcher *found = NULL;
+ if (w->state != NULL) {
+ external_connectivity_watcher_list_append(w->chand, w);
+ grpc_closure_run(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
+ grpc_schedule_on_exec_ctx);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ } else {
+ GPR_ASSERT(w->watcher_timer_init == NULL);
+ found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
+ if (found) {
+ GPR_ASSERT(found->on_complete == w->on_complete);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
+ }
+ grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
+ w->pollset);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_free(w);
+ }
}
void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *closure) {
+ grpc_connectivity_state *state, grpc_closure *closure,
+ grpc_closure *watcher_timer_init) {
channel_data *chand = elem->channel_data;
- external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
w->chand = chand;
w->pollset = pollset;
w->on_complete = closure;
w->state = state;
+ w->watcher_timer_init = watcher_timer_init;
+
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 8d2490ea55..356a7ab0c1 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -53,9 +53,13 @@ extern const grpc_channel_filter grpc_client_channel_filter;
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
+int grpc_client_channel_num_external_connectivity_watchers(
+ grpc_channel_element *elem);
+
void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *on_complete);
+ grpc_connectivity_state *state, grpc_closure *on_complete,
+ grpc_closure *watcher_timer_init);
/* Debug helper: pull the subchannel call from a call stack element */
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 6e7f410635..7ee6ffb787 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -99,26 +99,13 @@ typedef struct pending_pick {
grpc_closure *on_complete;
} pending_pick;
-/** List of subchannels in a connectivity READY state */
-typedef struct ready_list {
- grpc_subchannel *subchannel;
- /* references namesake entry in subchannel_data */
- void *user_data;
- struct ready_list *next;
- struct ready_list *prev;
-} ready_list;
-
typedef struct {
- /** index within policy->subchannels */
- size_t index;
/** backpointer to owning policy */
round_robin_lb_policy *policy;
/** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
grpc_closure connectivity_changed_closure;
- /** this subchannels current position in subchannel->ready_list */
- ready_list *ready_list_node;
/** last observed connectivity. Not updated by
* \a grpc_subchannel_notify_on_state_change. Used to determine the previous
* state while processing the new state in \a rr_connectivity_changed */
@@ -126,6 +113,10 @@ typedef struct {
/** current connectivity state. Updated by \a
* grpc_subchannel_notify_on_state_change */
grpc_connectivity_state curr_connectivity_state;
+ /** connectivity state to be updated by the watcher, not guarded by
+ * the combiner. Will be moved to curr_connectivity_state inside of
+ * the combiner by rr_connectivity_changed_locked(). */
+ grpc_connectivity_state pending_connectivity_state_unsafe;
/** the subchannel's target user data */
void *user_data;
/** vtable to operate over \a user_data */
@@ -141,182 +132,106 @@ struct round_robin_lb_policy {
/** all our subchannels */
size_t num_subchannels;
- subchannel_data **subchannels;
+ subchannel_data *subchannels;
- /** how many subchannels are in TRANSIENT_FAILURE */
+ /** how many subchannels are in state READY */
+ size_t num_ready;
+ /** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
- /** how many subchannels are IDLE */
+ /** how many subchannels are in state IDLE */
size_t num_idle;
/** have we started picking? */
- int started_picking;
+ bool started_picking;
/** are we shutting down? */
- int shutdown;
+ bool shutdown;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
- /** (Dummy) root of the doubly linked list containing READY subchannels */
- ready_list ready_list;
- /** Last pick from the ready list. */
- ready_list *ready_list_last_pick;
+ // Index into subchannels for last pick.
+ size_t last_ready_subchannel_index;
};
-/** Returns the next subchannel from the connected list or NULL if the list is
- * empty.
+/** Returns the index into p->subchannels of the next subchannel in
+ * READY state, or p->num_subchannels if no subchannel is READY.
*
- * Note that this function does *not* advance p->ready_list_last_pick. Use \a
- * advance_last_picked_locked() for that. */
-static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) {
- ready_list *selected;
- selected = p->ready_list_last_pick->next;
-
- while (selected != NULL) {
- if (selected == &p->ready_list) {
- GPR_ASSERT(selected->subchannel == NULL);
- /* skip dummy root */
- selected = selected->next;
- } else {
- GPR_ASSERT(selected->subchannel != NULL);
- return selected;
- }
+ * Note that this function does *not* update p->last_ready_subchannel_index.
+ * The caller must do that if it returns a pick. */
+static size_t get_next_ready_subchannel_index_locked(
+ const round_robin_lb_policy *p) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR: %p] getting next ready subchannel, "
+ "last_ready_subchannel_index=%lu",
+ p, (unsigned long)p->last_ready_subchannel_index);
}
- return NULL;
-}
-
-/** Advance the \a ready_list picking head. */
-static void advance_last_picked_locked(round_robin_lb_policy *p) {
- if (p->ready_list_last_pick->next != NULL) { /* non-empty list */
- p->ready_list_last_pick = p->ready_list_last_pick->next;
- if (p->ready_list_last_pick == &p->ready_list) {
- /* skip dummy root */
- p->ready_list_last_pick = p->ready_list_last_pick->next;
+ for (size_t i = 0; i < p->num_subchannels; ++i) {
+ const size_t index =
+ (i + p->last_ready_subchannel_index + 1) % p->num_subchannels;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p,
+ (unsigned long)index,
+ p->subchannels[index].curr_connectivity_state);
+ }
+ if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu",
+ p, (unsigned long)index);
+ }
+ return index;
}
- } else { /* should be an empty list */
- GPR_ASSERT(p->ready_list_last_pick == &p->ready_list);
}
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, "
- "CSC %p)",
- (void *)p, (void *)p->ready_list_last_pick,
- (void *)p->ready_list_last_pick->subchannel,
- (void *)grpc_subchannel_get_connected_subchannel(
- p->ready_list_last_pick->subchannel));
+ gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p);
}
+ return p->num_subchannels;
}
-/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
- * csc to the list of ready subchannels. */
-static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- subchannel_data *sd) {
- ready_list *new_elem = gpr_zalloc(sizeof(ready_list));
- new_elem->subchannel = sd->subchannel;
- new_elem->user_data = sd->user_data;
- if (p->ready_list.prev == NULL) {
- /* first element */
- new_elem->next = &p->ready_list;
- new_elem->prev = &p->ready_list;
- p->ready_list.next = new_elem;
- p->ready_list.prev = new_elem;
- } else {
- new_elem->next = &p->ready_list;
- new_elem->prev = p->ready_list.prev;
- p->ready_list.prev->next = new_elem;
- p->ready_list.prev = new_elem;
- }
+// Sets p->last_ready_subchannel_index to last_ready_index.
+static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
+ size_t last_ready_index) {
+ GPR_ASSERT(last_ready_index < p->num_subchannels);
+ p->last_ready_subchannel_index = last_ready_index;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
- (void *)new_elem, (void *)sd->subchannel);
- }
- return new_elem;
-}
-
-/** Removes \a node from the list of connected subchannels */
-static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
- ready_list *node) {
- if (node == NULL) {
- return;
- }
- if (node == p->ready_list_last_pick) {
- p->ready_list_last_pick = p->ready_list_last_pick->prev;
- }
-
- /* removing last item */
- if (node->next == &p->ready_list && node->prev == &p->ready_list) {
- GPR_ASSERT(p->ready_list.next == node);
- GPR_ASSERT(p->ready_list.prev == node);
- p->ready_list.next = NULL;
- p->ready_list.prev = NULL;
- } else {
- node->prev->next = node->next;
- node->next->prev = node->prev;
- }
-
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
- (void *)node->subchannel);
+ gpr_log(GPR_DEBUG,
+ "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
+ (void *)p, (unsigned long)last_ready_index,
+ (void *)p->subchannels[last_ready_index].subchannel,
+ (void *)grpc_subchannel_get_connected_subchannel(
+ p->subchannels[last_ready_index].subchannel));
}
-
- node->next = NULL;
- node->prev = NULL;
- node->subchannel = NULL;
-
- gpr_free(node);
-}
-
-static bool is_ready_list_empty(round_robin_lb_policy *p) {
- return p->ready_list.prev == NULL;
}
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- ready_list *elem;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
}
-
for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
}
- gpr_free(sd);
}
-
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
-
- elem = p->ready_list.next;
- while (elem != NULL && elem != &p->ready_list) {
- ready_list *tmp;
- tmp = elem->next;
- elem->next = NULL;
- elem->prev = NULL;
- elem->subchannel = NULL;
- gpr_free(elem);
- elem = tmp;
- }
-
gpr_free(p);
}
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- size_t i;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
}
-
- p->shutdown = 1;
+ p->shutdown = true;
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
@@ -328,10 +243,13 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
- for (i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
- &sd->connectivity_changed_closure);
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
+ NULL,
+ &sd->connectivity_changed_closure);
+ }
}
}
@@ -339,8 +257,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
@@ -364,8 +281,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
@@ -387,21 +303,16 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
- size_t i;
- p->started_picking = 1;
-
- for (i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- /* use some sentinel value outside of the range of grpc_connectivity_state
- * to signal an undefined previous state. We won't be referring to this
- * value again and it'll be overwritten after the first call to
- * rr_connectivity_changed */
- sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ p->started_picking = true;
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
+ }
}
}
@@ -418,36 +329,32 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- ready_list *selected;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
}
-
- if ((selected = peek_next_connected_locked(p))) {
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->num_subchannels) {
/* readily available, report right away */
+ subchannel_data *sd = &p->subchannels[next_ready_index];
*target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "rr_picked");
-
+ grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked");
if (user_data != NULL) {
- *user_data = selected->user_data;
+ *user_data = sd->user_data;
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
- (void *)*target, (void *)selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)",
+ (void *)*target, (unsigned long)next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
- advance_last_picked_locked(p);
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
return 1;
} else {
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking_locked(exec_ctx, p);
}
- pp = gpr_malloc(sizeof(*pp));
+ pending_pick *pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->target = target;
pp->on_complete = on_complete;
@@ -458,25 +365,31 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
-static void update_state_counters(subchannel_data *sd) {
+static void update_state_counters_locked(subchannel_data *sd) {
round_robin_lb_policy *p = sd->policy;
-
- /* update p->num_transient_failures (resp. p->num_idle): if the previous
- * state was TRANSIENT_FAILURE (resp. IDLE), decrement
- * p->num_transient_failures (resp. p->num_idle). */
- if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(p->num_ready > 0);
+ --p->num_ready;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(p->num_transient_failures > 0);
--p->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(p->num_idle > 0);
--p->num_idle;
}
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ ++p->num_ready;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++p->num_transient_failures;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
+ ++p->num_idle;
+ }
}
/* sd is the subchannel_data associted with the updated subchannel.
* shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
* or SHUTDOWN */
-static grpc_connectivity_state update_lb_connectivity_status(
+static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
@@ -498,7 +411,7 @@ static grpc_connectivity_state update_lb_connectivity_status(
* CHECK: p->num_idle == p->num_subchannels.
*/
round_robin_lb_policy *p = sd->policy;
- if (!is_ready_list_empty(p)) { /* 1) READY */
+ if (p->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY;
@@ -532,32 +445,62 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
- pending_pick *pp;
-
- GRPC_ERROR_REF(error);
-
+ // Now that we're inside the combiner, copy the pending connectivity
+ // state (which was set by the connectivity state watcher) to
+ // curr_connectivity_state, which is what we use inside of the combiner.
+ sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] connectivity changed for subchannel %p: "
+ "prev_state=%d new_state=%d",
+ p, sd->subchannel, sd->prev_connectivity_state,
+ sd->curr_connectivity_state);
+ }
+ // If we're shutting down, unref and return.
if (p->shutdown) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- GRPC_ERROR_UNREF(error);
return;
}
- switch (sd->curr_connectivity_state) {
- case GRPC_CHANNEL_INIT:
- GPR_UNREACHABLE_CODE(return );
- case GRPC_CHANNEL_READY:
- /* add the newly connected subchannel to the list of connected ones.
- * Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd);
+ // Update state counters and determine new overall state.
+ update_state_counters_locked(sd);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ grpc_connectivity_state new_connectivity_state =
+ update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+ // If the new state is SHUTDOWN, unref the subchannel, and if the new
+ // overall state is SHUTDOWN, clean up.
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
+ sd->subchannel = NULL;
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
+ if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ /* the policy is shutting down. Flush all the pending picks... */
+ pending_pick *pp;
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = NULL;
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ gpr_free(pp);
+ }
+ }
+ /* unref the "rr_connectivity" weak ref from start_picking */
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
+ } else {
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
- ready_list *selected = peek_next_connected_locked(p);
- GPR_ASSERT(selected != NULL);
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ GPR_ASSERT(next_ready_index < p->num_subchannels);
+ subchannel_data *selected = &p->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */
- advance_last_picked_locked(p);
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
}
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
@@ -568,72 +511,20 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
- "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- (void *)selected->subchannel, (void *)selected);
+ "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)",
+ (void *)selected->subchannel,
+ (unsigned long)next_ready_index);
}
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_IDLE:
- ++p->num_idle;
- /* fallthrough */
- case GRPC_CHANNEL_CONNECTING:
- update_state_counters(sd);
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- ++p->num_transient_failures;
- /* remove from ready list if still present */
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_SHUTDOWN:
- update_state_counters(sd);
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- --p->num_subchannels;
- GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
- p->subchannels[p->num_subchannels]);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
- p->subchannels[sd->index]->index = sd->index;
- if (update_lb_connectivity_status(exec_ctx, sd, error) ==
- GRPC_CHANNEL_SHUTDOWN) {
- /* the policy is shutting down. Flush all the pending picks... */
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
- }
- }
- gpr_free(sd);
- /* unref the "rr_connectivity" weak ref from start_picking */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- break;
+ }
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
- GRPC_ERROR_UNREF(error);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -654,10 +545,10 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- ready_list *selected;
- grpc_connected_subchannel *target;
- if ((selected = peek_next_connected_locked(p))) {
- target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->num_subchannels) {
+ subchannel_data *selected = &p->subchannels[next_ready_index];
+ grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
@@ -708,7 +599,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
- size_t subchannel_idx = 0;
+ size_t subchannel_index = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
if (addresses->addresses[i].is_balancer) continue;
@@ -727,42 +618,44 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s",
- (void *)subchannel, address_uri);
+ gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s",
+ (unsigned long)subchannel_index, (void *)subchannel, address_uri);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) {
- subchannel_data *sd = gpr_zalloc(sizeof(*sd));
- p->subchannels[subchannel_idx] = sd;
+ subchannel_data *sd = &p->subchannels[subchannel_index];
sd->policy = p;
- sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ /* use some sentinel value outside of the range of grpc_connectivity_state
+ * to signal an undefined previous state. We won't be referring to this
+ * value again and it'll be overwritten after the first call to
+ * rr_connectivity_changed */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) {
sd->user_data =
sd->user_data_vtable->copy(addresses->addresses[i].user_data);
}
- ++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed_locked, sd,
grpc_combiner_scheduler(args->combiner, false));
+ ++subchannel_index;
}
}
- if (subchannel_idx == 0) {
+ if (subchannel_index == 0) {
/* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
}
- p->num_subchannels = subchannel_idx;
+ p->num_subchannels = subchannel_index;
- /* The (dummy node) root of the ready list */
- p->ready_list.subchannel = NULL;
- p->ready_list.prev = NULL;
- p->ready_list.next = NULL;
- p->ready_list_last_pick = &p->ready_list;
+ // Initialize the last pick index to the last subchannel, so that the
+ // first pick will start at the beginning of the list.
+ p->last_ready_subchannel_index = subchannel_index - 1;
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index b9c62c376a..2c076e821c 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -127,6 +127,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
gpr_mu_lock(&state->mu);
if (state->shutdown) {
gpr_mu_unlock(&state->mu);
+ grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_NONE);
grpc_endpoint_destroy(exec_ctx, tcp);
gpr_free(acceptor);
return;
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 0ac2c2ad52..7012ffe568 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -105,7 +105,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
grpc_error *error) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent,
req->context->pollset_set);
- grpc_closure_sched(exec_ctx, req->on_done, GRPC_ERROR_REF(error));
+ grpc_closure_sched(exec_ctx, req->on_done, error);
grpc_http_parser_destroy(&req->parser);
if (req->addresses != NULL) {
grpc_resolved_addresses_destroy(req->addresses);
@@ -244,7 +244,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
internal_request *req = arg;
if (error != GRPC_ERROR_NONE) {
- finish(exec_ctx, req, error);
+ finish(exec_ctx, req, GRPC_ERROR_REF(error));
return;
}
req->next_address = 0;
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 76946434f0..ea7c1122c1 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -44,6 +44,7 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/string.h"
#include "src/core/tsi/ssl_transport_security.h"
+#include "src/core/tsi/transport_security_adapter.h"
typedef struct {
grpc_channel_security_connector base;
@@ -78,7 +79,8 @@ static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx,
}
grpc_handshake_manager_add(
handshake_mgr,
- grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base));
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_adapter_handshaker(handshaker), &sc->base));
}
static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index 30431a4e4a..416a3bdb35 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -56,6 +56,7 @@
#include "src/core/lib/support/string.h"
#include "src/core/tsi/fake_transport_security.h"
#include "src/core/tsi/ssl_transport_security.h"
+#include "src/core/tsi/transport_security_adapter.h"
/* -- Constants. -- */
@@ -390,7 +391,8 @@ static void fake_channel_add_handshakers(
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(
- exec_ctx, tsi_create_fake_handshaker(true /* is_client */),
+ exec_ctx, tsi_create_adapter_handshaker(
+ tsi_create_fake_handshaker(true /* is_client */)),
&sc->base));
}
@@ -400,7 +402,8 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(
- exec_ctx, tsi_create_fake_handshaker(false /* is_client */),
+ exec_ctx, tsi_create_adapter_handshaker(
+ tsi_create_fake_handshaker(false /* is_client */)),
&sc->base));
}
@@ -495,8 +498,10 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
}
// Create handshakers.
- grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
- exec_ctx, tsi_hs, &sc->base));
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base));
}
static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
@@ -515,8 +520,10 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
}
// Create handshakers.
- grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
- exec_ctx, tsi_hs, &sc->base));
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base));
}
static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) {
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index 509b4b556d..3bc113e20f 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -71,12 +71,12 @@ typedef struct {
unsigned char *handshake_buffer;
size_t handshake_buffer_size;
- grpc_slice_buffer left_overs;
grpc_slice_buffer outgoing;
grpc_closure on_handshake_data_sent_to_peer;
grpc_closure on_handshake_data_received_from_peer;
grpc_closure on_peer_checked;
grpc_auth_context *auth_context;
+ tsi_handshaker_result *handshaker_result;
} security_handshaker;
static void security_handshaker_unref(grpc_exec_ctx *exec_ctx,
@@ -84,6 +84,7 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&h->refs)) {
gpr_mu_destroy(&h->mu);
tsi_handshaker_destroy(h->handshaker);
+ tsi_handshaker_result_destroy(h->handshaker_result);
if (h->endpoint_to_destroy != NULL) {
grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy);
}
@@ -92,7 +93,6 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx,
gpr_free(h->read_buffer_to_destroy);
}
gpr_free(h->handshake_buffer);
- grpc_slice_buffer_destroy_internal(exec_ctx, &h->left_overs);
grpc_slice_buffer_destroy_internal(exec_ctx, &h->outgoing);
GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake");
GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, h->connector, "handshake");
@@ -150,10 +150,10 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error));
goto done;
}
- // Get frame protector.
+ // Create frame protector.
tsi_frame_protector *protector;
- tsi_result result =
- tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector);
+ tsi_result result = tsi_handshaker_result_create_frame_protector(
+ h->handshaker_result, NULL, &protector);
if (result != TSI_OK) {
error = grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"),
@@ -161,14 +161,25 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
security_handshake_failed_locked(exec_ctx, h, error);
goto done;
}
- // Success.
+ // Get unused bytes.
+ unsigned char *unused_bytes = NULL;
+ size_t unused_bytes_size = 0;
+ result = tsi_handshaker_result_get_unused_bytes(
+ h->handshaker_result, &unused_bytes, &unused_bytes_size);
// Create secure endpoint.
- h->args->endpoint = grpc_secure_endpoint_create(
- protector, h->args->endpoint, h->left_overs.slices, h->left_overs.count);
- h->left_overs.count = 0;
- h->left_overs.length = 0;
- // Clear out the read buffer before it gets passed to the transport,
- // since any excess bytes were already copied to h->left_overs.
+ if (unused_bytes_size > 0) {
+ grpc_slice slice =
+ grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size);
+ h->args->endpoint =
+ grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1);
+ grpc_slice_unref_internal(exec_ctx, slice);
+ } else {
+ h->args->endpoint =
+ grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0);
+ }
+ tsi_handshaker_result_destroy(h->handshaker_result);
+ h->handshaker_result = NULL;
+ // Clear out the read buffer before it gets passed to the transport.
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, h->args->read_buffer);
// Add auth context to channel args.
grpc_arg auth_context_arg = grpc_auth_context_to_arg(h->auth_context);
@@ -189,7 +200,8 @@ done:
static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx,
security_handshaker *h) {
tsi_peer peer;
- tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer);
+ tsi_result result =
+ tsi_handshaker_result_extract_peer(h->handshaker_result, &peer);
if (result != TSI_OK) {
return grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result);
@@ -199,34 +211,87 @@ static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx,
- security_handshaker *h) {
- // Get data to send.
- tsi_result result = TSI_OK;
- size_t offset = 0;
- do {
- size_t to_send_size = h->handshake_buffer_size - offset;
- result = tsi_handshaker_get_bytes_to_send_to_peer(
- h->handshaker, h->handshake_buffer + offset, &to_send_size);
- offset += to_send_size;
- if (result == TSI_INCOMPLETE_DATA) {
- h->handshake_buffer_size *= 2;
- h->handshake_buffer =
- gpr_realloc(h->handshake_buffer, h->handshake_buffer_size);
- }
- } while (result == TSI_INCOMPLETE_DATA);
+static grpc_error *on_handshake_next_done_locked(
+ grpc_exec_ctx *exec_ctx, security_handshaker *h, tsi_result result,
+ const unsigned char *bytes_to_send, size_t bytes_to_send_size,
+ tsi_handshaker_result *handshaker_result) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ // Read more if we need to.
+ if (result == TSI_INCOMPLETE_DATA) {
+ GPR_ASSERT(bytes_to_send_size == 0);
+ grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
+ &h->on_handshake_data_received_from_peer);
+ return error;
+ }
if (result != TSI_OK) {
return grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result);
}
- // Send data.
- grpc_slice to_send =
- grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset);
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing);
- grpc_slice_buffer_add(&h->outgoing, to_send);
- grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing,
- &h->on_handshake_data_sent_to_peer);
- return GRPC_ERROR_NONE;
+ // Update handshaker result.
+ if (handshaker_result != NULL) {
+ GPR_ASSERT(h->handshaker_result == NULL);
+ h->handshaker_result = handshaker_result;
+ }
+ if (bytes_to_send_size > 0) {
+ // Send data to peer, if needed.
+ grpc_slice to_send = grpc_slice_from_copied_buffer(
+ (const char *)bytes_to_send, bytes_to_send_size);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing);
+ grpc_slice_buffer_add(&h->outgoing, to_send);
+ grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing,
+ &h->on_handshake_data_sent_to_peer);
+ } else if (handshaker_result == NULL) {
+ // There is nothing to send, but need to read from peer.
+ grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
+ &h->on_handshake_data_received_from_peer);
+ } else {
+ // Handshake has finished, check peer and so on.
+ error = check_peer_locked(exec_ctx, h);
+ }
+ return error;
+}
+
+static void on_handshake_next_done_grpc_wrapper(
+ tsi_result result, void *user_data, const unsigned char *bytes_to_send,
+ size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) {
+ security_handshaker *h = user_data;
+ // This callback will be invoked by TSI in a non-grpc thread, so it's
+ // safe to create our own exec_ctx here.
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_lock(&h->mu);
+ grpc_error *error =
+ on_handshake_next_done_locked(&exec_ctx, h, result, bytes_to_send,
+ bytes_to_send_size, handshaker_result);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(&exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(&exec_ctx, h);
+ } else {
+ gpr_mu_unlock(&h->mu);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static grpc_error *do_handshaker_next_locked(
+ grpc_exec_ctx *exec_ctx, security_handshaker *h,
+ const unsigned char *bytes_received, size_t bytes_received_size) {
+ // Invoke TSI handshaker.
+ unsigned char *bytes_to_send = NULL;
+ size_t bytes_to_send_size = 0;
+ tsi_handshaker_result *handshaker_result = NULL;
+ tsi_result result = tsi_handshaker_next(
+ h->handshaker, bytes_received, bytes_received_size, &bytes_to_send,
+ &bytes_to_send_size, &handshaker_result,
+ &on_handshake_next_done_grpc_wrapper, h);
+ if (result == TSI_ASYNC) {
+ // Handshaker operating asynchronously. Nothing else to do here;
+ // callback will be invoked in a TSI thread.
+ return GRPC_ERROR_NONE;
+ }
+ // Handshaker returned synchronously. Invoke callback directly in
+ // this thread with our existing exec_ctx.
+ return on_handshake_next_done_locked(exec_ctx, h, result, bytes_to_send,
+ bytes_to_send_size, handshaker_result);
}
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
@@ -241,72 +306,34 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
security_handshaker_unref(exec_ctx, h);
return;
}
- // Process received data.
- tsi_result result = TSI_OK;
- size_t consumed_slice_size = 0;
+ // Copy all slices received.
size_t i;
+ size_t bytes_received_size = 0;
for (i = 0; i < h->args->read_buffer->count; i++) {
- consumed_slice_size = GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]);
- result = tsi_handshaker_process_bytes_from_peer(
- h->handshaker, GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]),
- &consumed_slice_size);
- if (!tsi_handshaker_is_in_progress(h->handshaker)) break;
+ bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]);
}
- if (tsi_handshaker_is_in_progress(h->handshaker)) {
- /* We may need more data. */
- if (result == TSI_INCOMPLETE_DATA) {
- grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
- &h->on_handshake_data_received_from_peer);
- goto done;
- } else {
- error = send_handshake_bytes_to_peer_locked(exec_ctx, h);
- if (error != GRPC_ERROR_NONE) {
- security_handshake_failed_locked(exec_ctx, h, error);
- gpr_mu_unlock(&h->mu);
- security_handshaker_unref(exec_ctx, h);
- return;
- }
- goto done;
- }
+ if (bytes_received_size > h->handshake_buffer_size) {
+ h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size);
+ h->handshake_buffer_size = bytes_received_size;
}
- if (result != TSI_OK) {
- security_handshake_failed_locked(
- exec_ctx, h,
- grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result));
- gpr_mu_unlock(&h->mu);
- security_handshaker_unref(exec_ctx, h);
- return;
- }
- /* Handshake is done and successful this point. */
- bool has_left_overs_in_current_slice =
- (consumed_slice_size <
- GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]));
- size_t num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) +
- h->args->read_buffer->count - i - 1;
- if (num_left_overs > 0) {
- /* Put the leftovers in our buffer (ownership transfered). */
- if (has_left_overs_in_current_slice) {
- grpc_slice tail = grpc_slice_split_tail(&h->args->read_buffer->slices[i],
- consumed_slice_size);
- grpc_slice_buffer_add(&h->left_overs, tail);
- /* split_tail above increments refcount. */
- grpc_slice_unref_internal(exec_ctx, tail);
- }
- grpc_slice_buffer_addn(
- &h->left_overs, &h->args->read_buffer->slices[i + 1],
- num_left_overs - (size_t)has_left_overs_in_current_slice);
+ size_t offset = 0;
+ for (i = 0; i < h->args->read_buffer->count; i++) {
+ size_t slice_size = GPR_SLICE_LENGTH(h->args->read_buffer->slices[i]);
+ memcpy(h->handshake_buffer + offset,
+ GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), slice_size);
+ offset += slice_size;
}
- // Check peer.
- error = check_peer_locked(exec_ctx, h);
+ // Call TSI handshaker.
+ error = do_handshaker_next_locked(exec_ctx, h, h->handshake_buffer,
+ bytes_received_size);
+
if (error != GRPC_ERROR_NONE) {
security_handshake_failed_locked(exec_ctx, h, error);
gpr_mu_unlock(&h->mu);
security_handshaker_unref(exec_ctx, h);
- return;
+ } else {
+ gpr_mu_unlock(&h->mu);
}
-done:
- gpr_mu_unlock(&h->mu);
}
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg,
@@ -321,8 +348,8 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg,
security_handshaker_unref(exec_ctx, h);
return;
}
- /* We may be done. */
- if (tsi_handshaker_is_in_progress(h->handshaker)) {
+ // We may be done.
+ if (h->handshaker_result == NULL) {
grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
&h->on_handshake_data_received_from_peer);
} else {
@@ -371,7 +398,7 @@ static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
h->args = args;
h->on_handshake_done = on_handshake_done;
gpr_ref(&h->refs);
- grpc_error *error = send_handshake_bytes_to_peer_locked(exec_ctx, h);
+ grpc_error *error = do_handshaker_next_locked(exec_ctx, h, NULL, 0);
if (error != GRPC_ERROR_NONE) {
security_handshake_failed_locked(exec_ctx, h, error);
gpr_mu_unlock(&h->mu);
@@ -404,7 +431,6 @@ static grpc_handshaker *security_handshaker_create(
grpc_schedule_on_exec_ctx);
grpc_closure_init(&h->on_peer_checked, on_peer_checked, h,
grpc_schedule_on_exec_ctx);
- grpc_slice_buffer_init(&h->left_overs);
grpc_slice_buffer_init(&h->outgoing);
return &h->base;
}
diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc
index 72a4c4cf94..e4f5cb8422 100644
--- a/src/cpp/common/version_cc.cc
+++ b/src/cpp/common/version_cc.cc
@@ -37,5 +37,5 @@
#include <grpc++/grpc++.h>
namespace grpc {
-grpc::string Version() { return "1.4.0-dev"; }
+grpc::string Version() { return "1.5.0-dev"; }
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 6dca6a6862..e20e933374 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -172,14 +172,25 @@ ServerBuilder& ServerBuilder::SetResourceQuota(
}
ServerBuilder& ServerBuilder::AddListeningPort(
- const grpc::string& addr, std::shared_ptr<ServerCredentials> creds,
+ const grpc::string& addr_uri, std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
+ const grpc::string uri_scheme = "dns:";
+ grpc::string addr = addr_uri;
+ if (addr_uri.compare(0, uri_scheme.size(), uri_scheme) == 0) {
+ size_t pos = uri_scheme.size();
+ while (addr_uri[pos] == '/') ++pos; // Skip slashes.
+ addr = addr_uri.substr(pos);
+ }
Port port = {addr, creds, selected_port};
ports_.push_back(port);
return *this;
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin)->UpdateServerBuilder(this);
+ }
+
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
diff --git a/src/csharp/.editorconfig b/src/csharp/.editorconfig
new file mode 100644
index 0000000000..7bc2bcce18
--- /dev/null
+++ b/src/csharp/.editorconfig
@@ -0,0 +1,7 @@
+root = true
+[**]
+end_of_line = LF
+indent_style = space
+indent_size = 4
+insert_final_newline = true
+tab_width = 4
diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
index 6ac25aa1f0..6030c70783 100755
--- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj
+++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
@@ -16,6 +16,9 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
+ <GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup>
@@ -23,7 +26,9 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj">
+ <PrivateAssets>None</PrivateAssets>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj b/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj
index f4dd5105fc..4e186d14dc 100755
--- a/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj
+++ b/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\Grpc.Core\Version.csproj.include" />
<Import Project="..\Grpc.Core\Common.csproj.include" />
@@ -16,6 +16,9 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
+ <GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup>
@@ -23,12 +26,12 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj">
+ <PrivateAssets>None</PrivateAssets>
+ </ProjectReference>
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
- <Reference Include="System.Runtime" />
- <Reference Include="System.IO" />
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 4f29c35b32..51ae11fbde 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -59,6 +59,8 @@ namespace Grpc.Core
readonly ChannelSafeHandle handle;
readonly Dictionary<string, ChannelOption> options;
+ readonly Task connectivityWatcherTask;
+
bool shutdownRequested;
/// <summary>
@@ -99,6 +101,9 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
}
}
+ // TODO(jtattermusch): Workaround for https://github.com/GoogleCloudPlatform/google-cloud-dotnet/issues/822.
+ // Remove once retries are supported in C core
+ this.connectivityWatcherTask = RunConnectivityWatcherAsync();
GrpcEnvironment.RegisterChannel(this);
}
@@ -244,7 +249,7 @@ namespace Grpc.Core
handle.Dispose();
- await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
+ await Task.WhenAll(GrpcEnvironment.ReleaseAsync(), connectivityWatcherTask).ConfigureAwait(false);
}
internal ChannelSafeHandle Handle
@@ -299,6 +304,40 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Constantly Watches channel connectivity status to work around https://github.com/GoogleCloudPlatform/google-cloud-dotnet/issues/822
+ /// </summary>
+ private async Task RunConnectivityWatcherAsync()
+ {
+ try
+ {
+ var lastState = State;
+ while (lastState != ChannelState.Shutdown)
+ {
+ lock (myLock)
+ {
+ if (shutdownRequested)
+ {
+ break;
+ }
+ }
+
+ try
+ {
+ await WaitForStateChangedAsync(lastState, DateTime.UtcNow.AddSeconds(1)).ConfigureAwait(false);
+ }
+ catch (TaskCanceledException)
+ {
+ // ignore timeout
+ }
+ lastState = State;
+ }
+ }
+ catch (ObjectDisposedException) {
+ // during shutdown, channel is going to be disposed.
+ }
+ }
+
private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
{
var key = ChannelOptions.PrimaryUserAgentString;
diff --git a/src/csharp/Grpc.Core/Common.csproj.include b/src/csharp/Grpc.Core/Common.csproj.include
index 2cb990ba49..3b1bec2d55 100755
--- a/src/csharp/Grpc.Core/Common.csproj.include
+++ b/src/csharp/Grpc.Core/Common.csproj.include
@@ -13,10 +13,6 @@
</PropertyGroup>
<PropertyGroup>
- <GenerateDocumentationFile>true</GenerateDocumentationFile>
- </PropertyGroup>
-
- <PropertyGroup>
<DefineConstants>$(DefineConstants);SIGNED</DefineConstants>
<AssemblyOriginatorKeyFile>../keys/Grpc.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index c0865001a8..50358298f4 100755
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
<Import Project="Version.csproj.include" />
<Import Project="Common.csproj.include" />
@@ -15,6 +15,9 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
+ <GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup>
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 8ed0c0b92f..bc74e212b1 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -218,5 +218,16 @@ namespace Grpc.Core.Internal
{
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
+
+ /// <summary>
+ /// Only for testing.
+ /// </summary>
+ public static CallSafeHandle CreateFake(IntPtr ptr, CompletionQueueSafeHandle cq)
+ {
+ var call = new CallSafeHandle();
+ call.SetHandle(ptr);
+ call.Initialize(cq);
+ return call;
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
index a4aa8d3ffe..075286d33e 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
@@ -52,6 +52,7 @@ namespace Grpc.Core.Internal
readonly GrpcEnvironment environment;
readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>(new IntPtrComparer());
+ IntPtr lastRegisteredKey; // only for testing
public CompletionRegistry(GrpcEnvironment environment)
{
@@ -62,6 +63,7 @@ namespace Grpc.Core.Internal
{
environment.DebugStats.PendingBatchCompletions.Increment();
GrpcPreconditions.CheckState(dict.TryAdd(key, callback));
+ this.lastRegisteredKey = key;
}
public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
@@ -84,6 +86,14 @@ namespace Grpc.Core.Internal
return value;
}
+ /// <summary>
+ /// For testing purposes only.
+ /// </summary>
+ public IntPtr LastRegisteredKey
+ {
+ get { return this.lastRegisteredKey; }
+ }
+
private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
{
try
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 696987d2a8..e703e3e6ce 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -164,6 +164,8 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_test_callback_delegate grpcsharp_test_callback;
public readonly Delegates.grpcsharp_test_nop_delegate grpcsharp_test_nop;
+ public readonly Delegates.grpcsharp_test_override_method_delegate grpcsharp_test_override_method;
+
#endregion
public NativeMethods(UnmanagedLibrary library)
@@ -278,6 +280,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_test_callback = GetMethodDelegate<Delegates.grpcsharp_test_callback_delegate>(library);
this.grpcsharp_test_nop = GetMethodDelegate<Delegates.grpcsharp_test_nop_delegate>(library);
+ this.grpcsharp_test_override_method = GetMethodDelegate<Delegates.grpcsharp_test_override_method_delegate>(library);
}
/// <summary>
@@ -434,6 +437,7 @@ namespace Grpc.Core.Internal
public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr);
+ public delegate void grpcsharp_test_override_method_delegate(string methodName, string variant);
}
}
}
diff --git a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs
index 77ac347c7d..fe757820fd 100644
--- a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs
+++ b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs
@@ -59,8 +59,14 @@ using System.Runtime.CompilerServices;
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
+[assembly: InternalsVisibleTo("Grpc.Microbenchmarks,PublicKey=" +
+ "00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
+ "0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
+ "27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
+ "71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
#else
[assembly: InternalsVisibleTo("Grpc.Core.Tests")]
[assembly: InternalsVisibleTo("Grpc.Core.Testing")]
[assembly: InternalsVisibleTo("Grpc.IntegrationTesting")]
+[assembly: InternalsVisibleTo("Grpc.Microbenchmarks")]
#endif
diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include
index 8388bfd9cc..81156452f3 100755
--- a/src/csharp/Grpc.Core/Version.csproj.include
+++ b/src/csharp/Grpc.Core/Version.csproj.include
@@ -1,7 +1,7 @@
<!-- This file is generated -->
<Project>
<PropertyGroup>
- <GrpcCsharpVersion>1.4.0-dev</GrpcCsharpVersion>
+ <GrpcCsharpVersion>1.5.0-dev</GrpcCsharpVersion>
<GoogleProtobufVersion>3.3.0</GoogleProtobufVersion>
</PropertyGroup>
</Project>
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index 2e55d9d80e..d507878c2d 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -48,11 +48,11 @@ namespace Grpc.Core
/// <summary>
/// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies
/// </summary>
- public const string CurrentAssemblyFileVersion = "1.4.0.0";
+ public const string CurrentAssemblyFileVersion = "1.5.0.0";
/// <summary>
/// Current version of gRPC C#
/// </summary>
- public const string CurrentVersion = "1.4.0-dev";
+ public const string CurrentVersion = "1.5.0-dev";
}
}
diff --git a/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj b/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj
index eac6e1fc95..b54311bbd5 100755
--- a/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj
+++ b/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj
@@ -15,6 +15,9 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
+ <GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup>
@@ -22,7 +25,9 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj">
+ <PrivateAssets>None</PrivateAssets>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index c9f7c42b71..a0eb468c5b 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -140,7 +140,8 @@ namespace Grpc.IntegrationTesting
readonly ClientType clientType;
readonly RpcType rpcType;
readonly PayloadConfig payloadConfig;
- readonly Histogram histogram;
+ readonly Lazy<byte[]> cachedByteBufferRequest;
+ readonly ThreadLocal<Histogram> threadLocalHistogram;
readonly List<Task> runnerTasks;
readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
@@ -155,7 +156,8 @@ namespace Grpc.IntegrationTesting
this.clientType = clientType;
this.rpcType = rpcType;
this.payloadConfig = payloadConfig;
- this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
+ this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]);
+ this.threadLocalHistogram = new ThreadLocal<Histogram>(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true);
this.runnerTasks = new List<Task>();
foreach (var channel in this.channels)
@@ -171,7 +173,12 @@ namespace Grpc.IntegrationTesting
public ClientStats GetStats(bool reset)
{
- var histogramData = histogram.GetSnapshot(reset);
+ var histogramData = new HistogramData();
+ foreach (var hist in threadLocalHistogram.Values)
+ {
+ hist.GetSnapshot(histogramData, reset);
+ }
+
var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
if (reset)
@@ -179,8 +186,8 @@ namespace Grpc.IntegrationTesting
statsResetCount.Increment();
}
- GrpcEnvironment.Logger.Info("[ClientRunnerImpl.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, gen3 {3} (histogram reset count:{4}, seconds since reset: {5})",
- GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), GC.CollectionCount(3), statsResetCount.Count, secondsElapsed);
+ GrpcEnvironment.Logger.Info("[ClientRunnerImpl.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, (histogram reset count:{3}, seconds since reset: {4})",
+ GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), statsResetCount.Count, secondsElapsed);
// TODO: populate user time and system time
return new ClientStats
@@ -232,7 +239,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
- histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
timer.WaitForNext();
}
@@ -251,7 +258,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
- histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@@ -273,7 +280,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
- histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@@ -286,7 +293,7 @@ namespace Grpc.IntegrationTesting
private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
{
- var request = CreateByteBufferRequest();
+ var request = cachedByteBufferRequest.Value;
var stopwatch = new Stopwatch();
var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
@@ -301,7 +308,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
- histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@@ -351,11 +358,6 @@ namespace Grpc.IntegrationTesting
};
}
- private byte[] CreateByteBufferRequest()
- {
- return new byte[payloadConfig.BytebufParams.ReqSize];
- }
-
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
diff --git a/src/csharp/Grpc.IntegrationTesting/Histogram.cs b/src/csharp/Grpc.IntegrationTesting/Histogram.cs
index 28d1f078a9..9d33c497e6 100644
--- a/src/csharp/Grpc.IntegrationTesting/Histogram.cs
+++ b/src/csharp/Grpc.IntegrationTesting/Histogram.cs
@@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting
}
}
-
/// <summary>
- /// Gets snapshot of stats and reset
+ /// Gets snapshot of stats and optionally resets the histogram.
/// </summary>
public HistogramData GetSnapshot(bool reset = false)
{
lock (myLock)
{
- return GetSnapshotUnsafe(reset);
+ var histogramData = new HistogramData();
+ GetSnapshotUnsafe(histogramData, reset);
+ return histogramData;
+ }
+ }
+
+ /// <summary>
+ /// Merges snapshot of stats into <c>mergeTo</c> and optionally resets the histogram.
+ /// </summary>
+ public void GetSnapshot(HistogramData mergeTo, bool reset)
+ {
+ lock (myLock)
+ {
+ GetSnapshotUnsafe(mergeTo, reset);
}
}
@@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting
this.buckets[FindBucket(value)]++;
}
- private HistogramData GetSnapshotUnsafe(bool reset)
+ private void GetSnapshotUnsafe(HistogramData mergeTo, bool reset)
{
- var data = new HistogramData
+ GrpcPreconditions.CheckArgument(mergeTo.Bucket.Count == 0 || mergeTo.Bucket.Count == buckets.Length);
+ if (mergeTo.Count == 0)
{
- Count = count,
- Sum = sum,
- SumOfSquares = sumOfSquares,
- MinSeen = min,
- MaxSeen = max,
- Bucket = { buckets }
- };
+ mergeTo.MinSeen = min;
+ mergeTo.MaxSeen = max;
+ }
+ else
+ {
+ mergeTo.MinSeen = Math.Min(mergeTo.MinSeen, min);
+ mergeTo.MaxSeen = Math.Max(mergeTo.MaxSeen, max);
+ }
+ mergeTo.Count += count;
+ mergeTo.Sum += sum;
+ mergeTo.SumOfSquares += sumOfSquares;
- if (reset)
+ if (mergeTo.Bucket.Count == 0)
{
- ResetUnsafe();
+ mergeTo.Bucket.AddRange(buckets);
+ }
+ else
+ {
+ for (int i = 0; i < buckets.Length; i++)
+ {
+ mergeTo.Bucket[i] += buckets[i];
+ }
}
- return data;
+ if (reset)
+ {
+ ResetUnsafe();
+ }
}
private void ResetUnsafe()
diff --git a/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs b/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs
index fa160cbd15..e8a2ed0c5b 100644
--- a/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs
@@ -73,7 +73,7 @@ namespace Grpc.IntegrationTesting
{
var hist = new Histogram(0.01, 60e9);
hist.AddObservation(-0.5); // should be in the first bucket
- hist.AddObservation(1e12); // should be in the last bucket
+ hist.AddObservation(1e12); // should be in the last bucket
var data = hist.GetSnapshot();
Assert.AreEqual(1, data.Bucket[0]);
@@ -81,6 +81,30 @@ namespace Grpc.IntegrationTesting
}
[Test]
+ public void MergeSnapshots()
+ {
+ var data = new HistogramData();
+
+ var hist1 = new Histogram(0.01, 60e9);
+ hist1.AddObservation(-0.5); // should be in the first bucket
+ hist1.AddObservation(1e12); // should be in the last bucket
+ hist1.GetSnapshot(data, false);
+
+ var hist2 = new Histogram(0.01, 60e9);
+ hist2.AddObservation(10000);
+ hist2.AddObservation(11000);
+ hist2.GetSnapshot(data, false);
+
+ Assert.AreEqual(4, data.Count);
+ Assert.AreEqual(-0.5, data.MinSeen);
+ Assert.AreEqual(1e12, data.MaxSeen);
+ Assert.AreEqual(1, data.Bucket[0]);
+ Assert.AreEqual(1, data.Bucket[925]);
+ Assert.AreEqual(1, data.Bucket[935]);
+ Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]);
+ }
+
+ [Test]
public void Reset()
{
var hist = new Histogram(0.01, 60e9);
diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
index 486befe964..fbdb8fa3d6 100644
--- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
+++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
@@ -100,8 +100,8 @@ namespace Grpc.IntegrationTesting
await tcs.Task;
await server.ShutdownAsync();
- GrpcEnvironment.Logger.Info("GC collection counts (after shutdown): gen0 {0}, gen1 {1}, gen2 {2}, gen3 {3}",
- GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), GC.CollectionCount(3));
+ GrpcEnvironment.Logger.Info("GC collection counts (after shutdown): gen0 {0}, gen1 {1}, gen2 {2}",
+ GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2));
}
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
index 7ab7734700..5acfce19c3 100644
--- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
@@ -154,8 +154,8 @@ namespace Grpc.IntegrationTesting
{
var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
- GrpcEnvironment.Logger.Info("[ServerRunner.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, gen3 {3} (seconds since last reset {4})",
- GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), GC.CollectionCount(3), secondsElapsed);
+ GrpcEnvironment.Logger.Info("[ServerRunner.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, (seconds since last reset {3})",
+ GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), secondsElapsed);
// TODO: populate user time and system time
return new ServerStats
diff --git a/src/csharp/Grpc.Microbenchmarks/.gitignore b/src/csharp/Grpc.Microbenchmarks/.gitignore
new file mode 100644
index 0000000000..1746e3269e
--- /dev/null
+++ b/src/csharp/Grpc.Microbenchmarks/.gitignore
@@ -0,0 +1,2 @@
+bin
+obj
diff --git a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
new file mode 100644
index 0000000000..26a940e488
--- /dev/null
+++ b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
@@ -0,0 +1,28 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <Import Project="..\Grpc.Core\Version.csproj.include" />
+ <Import Project="..\Grpc.Core\Common.csproj.include" />
+
+ <PropertyGroup>
+ <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <AssemblyName>Grpc.Microbenchmarks</AssemblyName>
+ <OutputType>Exe</OutputType>
+ <PackageId>Grpc.Microbenchmarks</PackageId>
+ <PackageTargetFallback Condition=" '$(TargetFramework)' == 'netcoreapp1.0' ">$(PackageTargetFallback);portable-net45</PackageTargetFallback>
+ <RuntimeFrameworkVersion Condition=" '$(TargetFramework)' == 'netcoreapp1.0' ">1.0.4</RuntimeFrameworkVersion>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ </ItemGroup>
+
+ <ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
+ <Reference Include="System" />
+ <Reference Include="Microsoft.CSharp" />
+ </ItemGroup>
+
+ <ItemGroup>
+ <Compile Include="..\Grpc.Core\Version.cs" />
+ </ItemGroup>
+
+</Project>
diff --git a/src/csharp/Grpc.Microbenchmarks/Program.cs b/src/csharp/Grpc.Microbenchmarks/Program.cs
new file mode 100644
index 0000000000..a0ca1f75ae
--- /dev/null
+++ b/src/csharp/Grpc.Microbenchmarks/Program.cs
@@ -0,0 +1,55 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Logging;
+
+namespace Grpc.Microbenchmarks
+{
+ class Program
+ {
+ public static void Main(string[] args)
+ {
+ GrpcEnvironment.SetLogger(new TextWriterLogger(Console.Error));
+ var benchmark = new SendMessageBenchmark();
+ benchmark.Init();
+ foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12})
+ {
+ benchmark.Run(threadCount, 4 * 1000 * 1000, 0);
+ }
+ benchmark.Cleanup();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
new file mode 100644
index 0000000000..eea375824f
--- /dev/null
+++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
@@ -0,0 +1,106 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using System.Collections.Generic;
+using System.Diagnostics;
+
+namespace Grpc.Microbenchmarks
+{
+ public class SendMessageBenchmark
+ {
+ static readonly NativeMethods Native = NativeMethods.Get();
+
+ GrpcEnvironment environment;
+
+ public void Init()
+ {
+ Native.grpcsharp_test_override_method("grpcsharp_call_start_batch", "nop");
+ environment = GrpcEnvironment.AddRef();
+ }
+
+ public void Cleanup()
+ {
+ GrpcEnvironment.ReleaseAsync().Wait();
+ // TODO(jtattermusch): track GC stats
+ }
+
+ public void Run(int threadCount, int iterations, int payloadSize)
+ {
+ Console.WriteLine(string.Format("SendMessageBenchmark: threads={0}, iterations={1}, payloadSize={2}", threadCount, iterations, payloadSize));
+ var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, payloadSize));
+ threadedBenchmark.Run();
+ }
+
+ private void ThreadBody(int iterations, int payloadSize)
+ {
+ // TODO(jtattermusch): parametrize by number of pending completions.
+ // TODO(jtattermusch): parametrize by cached/non-cached BatchContextSafeHandle
+
+ var completionRegistry = new CompletionRegistry(environment);
+ var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
+ var call = CreateFakeCall(cq);
+
+ var sendCompletionHandler = new SendCompletionHandler((success) => { });
+ var payload = new byte[payloadSize];
+ var writeFlags = default(WriteFlags);
+
+ var stopwatch = Stopwatch.StartNew();
+ for (int i = 0; i < iterations; i++)
+ {
+ call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false);
+ var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey);
+ callback(true);
+ }
+ stopwatch.Stop();
+ Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
+
+ cq.Dispose();
+ }
+
+ private static CallSafeHandle CreateFakeCall(CompletionQueueSafeHandle cq)
+ {
+ var call = CallSafeHandle.CreateFake(new IntPtr(0xdead), cq);
+ bool success = false;
+ while (!success)
+ {
+ // avoid calling destroy on a nonexistent grpc_call pointer
+ call.DangerousAddRef(ref success);
+ }
+ return call;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs
new file mode 100644
index 0000000000..1c54624034
--- /dev/null
+++ b/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs
@@ -0,0 +1,79 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using System.Collections.Generic;
+using System.Diagnostics;
+
+namespace Grpc.Microbenchmarks
+{
+ public class ThreadedBenchmark
+ {
+ List<ThreadStart> runners;
+
+ public ThreadedBenchmark(IEnumerable<ThreadStart> runners)
+ {
+ this.runners = new List<ThreadStart>(runners);
+ }
+
+ public ThreadedBenchmark(int threadCount, Action threadBody)
+ {
+ this.runners = new List<ThreadStart>();
+ for (int i = 0; i < threadCount; i++)
+ {
+ this.runners.Add(new ThreadStart(() => threadBody()));
+ }
+ }
+
+ public void Run()
+ {
+ Console.WriteLine("Running threads.");
+ var threads = new List<Thread>();
+ for (int i = 0; i < runners.Count; i++)
+ {
+ var thread = new Thread(runners[i]);
+ thread.Start();
+ threads.Add(thread);
+ }
+
+ foreach (var thread in threads)
+ {
+ thread.Join();
+ }
+ Console.WriteLine("All threads finished.");
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj b/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj
index 70bfcc89c5..929a78edcb 100755
--- a/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj
+++ b/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj
@@ -15,6 +15,9 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
+ <GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup>
@@ -22,7 +25,9 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj">
+ <PrivateAssets>None</PrivateAssets>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
diff --git a/src/csharp/Grpc.sln b/src/csharp/Grpc.sln
index beab3ccb36..d9a7b8d556 100644
--- a/src/csharp/Grpc.sln
+++ b/src/csharp/Grpc.sln
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
-VisualStudioVersion = 15.0.26228.4
+VisualStudioVersion = 15.0.26430.4
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Core", "Grpc.Core\Grpc.Core.csproj", "{BD878CB3-BDB4-46AB-84EF-C3B4729F56BC}"
EndProject
@@ -37,6 +37,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Reflection", "Grpc.Ref
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Reflection.Tests", "Grpc.Reflection.Tests\Grpc.Reflection.Tests.csproj", "{335AD0A2-F2CC-4C2E-853C-26174206BEE7}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Microbenchmarks", "Grpc.Microbenchmarks\Grpc.Microbenchmarks.csproj", "{84C17746-4727-4290-8E8B-A380793DAE1E}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -111,6 +113,10 @@ Global
{335AD0A2-F2CC-4C2E-853C-26174206BEE7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{335AD0A2-F2CC-4C2E-853C-26174206BEE7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{335AD0A2-F2CC-4C2E-853C-26174206BEE7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {84C17746-4727-4290-8E8B-A380793DAE1E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {84C17746-4727-4290-8E8B-A380793DAE1E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {84C17746-4727-4290-8E8B-A380793DAE1E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {84C17746-4727-4290-8E8B-A380793DAE1E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat
index d823942be5..35664cc762 100755
--- a/src/csharp/build_packages_dotnetcli.bat
+++ b/src/csharp/build_packages_dotnetcli.bat
@@ -28,7 +28,7 @@
@rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
@rem Current package versions
-set VERSION=1.4.0-dev
+set VERSION=1.5.0-dev
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe
@@ -51,11 +51,11 @@ powershell -Command "cp -r ..\..\platform=*\artifacts\protoc_* protoc_plugins"
@rem To be able to build, we also need to put grpc_csharp_ext to its normal location
xcopy /Y /I nativelibs\csharp_ext_windows_x64\grpc_csharp_ext.dll ..\..\cmake\build\x64\Release\
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Core --output ..\..\..\artifacts || goto :error
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Core.Testing --output ..\..\..\artifacts || goto :error
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Auth --output ..\..\..\artifacts || goto :error
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.HealthCheck --output ..\..\..\artifacts || goto :error
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Reflection --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.Core --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.Core.Testing --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.Auth --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.HealthCheck --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.Reflection --output ..\..\..\artifacts || goto :error
%NUGET% pack Grpc.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts || goto :error
%NUGET% pack Grpc.Tools.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts
diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh
index f79c97fbbc..7dc07a220d 100755
--- a/src/csharp/build_packages_dotnetcli.sh
+++ b/src/csharp/build_packages_dotnetcli.sh
@@ -48,13 +48,13 @@ dotnet restore Grpc.sln
mkdir -p ../../libs/opt
cp nativelibs/csharp_ext_linux_x64/libgrpc_csharp_ext.so ../../libs/opt
-dotnet pack --configuration Release --include-symbols --include-source Grpc.Core --output ../../../artifacts
-dotnet pack --configuration Release --include-symbols --include-source Grpc.Core.Testing --output ../../../artifacts
-dotnet pack --configuration Release --include-symbols --include-source Grpc.Auth --output ../../../artifacts
-dotnet pack --configuration Release --include-symbols --include-source Grpc.HealthCheck --output ../../../artifacts
-dotnet pack --configuration Release --include-symbols --include-source Grpc.Reflection --output ../../../artifacts
-
-nuget pack Grpc.nuspec -Version "1.4.0-dev" -OutputDirectory ../../artifacts
-nuget pack Grpc.Tools.nuspec -Version "1.4.0-dev" -OutputDirectory ../../artifacts
+dotnet pack --configuration Release Grpc.Core --output ../../../artifacts
+dotnet pack --configuration Release Grpc.Core.Testing --output ../../../artifacts
+dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts
+dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts
+dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts
+
+nuget pack Grpc.nuspec -Version "1.5.0-dev" -OutputDirectory ../../artifacts
+nuget pack Grpc.Tools.nuspec -Version "1.5.0-dev" -OutputDirectory ../../artifacts
(cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg)
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index f6cff454bd..a56113eca3 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -529,6 +529,38 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
grpc_call_unref(call);
}
+typedef grpc_call_error (*grpcsharp_call_start_batch_func)(grpc_call *call,
+ const grpc_op *ops,
+ size_t nops,
+ void *tag,
+ void *reserved);
+
+/* Only for testing */
+static grpc_call_error grpcsharp_call_start_batch_nop(grpc_call *call,
+ const grpc_op *ops,
+ size_t nops, void *tag,
+ void *reserved) {
+ return GRPC_CALL_OK;
+}
+
+static grpc_call_error grpcsharp_call_start_batch_default(grpc_call *call,
+ const grpc_op *ops,
+ size_t nops,
+ void *tag,
+ void *reserved) {
+ return grpc_call_start_batch(call, ops, nops, tag, reserved);
+}
+
+static grpcsharp_call_start_batch_func g_call_start_batch_func =
+ grpcsharp_call_start_batch_default;
+
+static grpc_call_error grpcsharp_call_start_batch(grpc_call *call,
+ const grpc_op *ops,
+ size_t nops, void *tag,
+ void *reserved) {
+ return g_call_start_batch_func(call, ops, nops, tag, reserved);
+}
+
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary(
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, uint32_t write_flags,
@@ -576,8 +608,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary(
ops[5].flags = 0;
ops[5].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(
@@ -616,8 +648,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(
ops[3].flags = 0;
ops[3].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
@@ -656,8 +688,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[3].flags = 0;
ops[3].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming(
@@ -685,8 +717,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming(
ops[1].flags = 0;
ops[1].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata(
@@ -699,8 +731,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata(
ops[0].flags = 0;
ops[0].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(
@@ -720,7 +752,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(
ops[1].flags = 0;
ops[1].reserved = NULL;
- return grpc_call_start_batch(call, ops, nops, ctx, NULL);
+ return grpcsharp_call_start_batch(call, ops, nops, ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client(
@@ -731,8 +763,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client(
ops[0].flags = 0;
ops[0].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
@@ -773,7 +805,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[nops].reserved = NULL;
nops++;
}
- return grpc_call_start_batch(call, ops, nops, ctx, NULL);
+ return grpcsharp_call_start_batch(call, ops, nops, ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -784,8 +816,8 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
ops[0].data.recv_message.recv_message = &(ctx->recv_message);
ops[0].flags = 0;
ops[0].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -798,8 +830,8 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
ops[0].flags = 0;
ops[0].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_initial_metadata(
@@ -817,8 +849,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_initial_metadata(
ops[0].flags = 0;
ops[0].reserved = NULL;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]),
+ ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -1092,3 +1124,17 @@ GPR_EXPORT void *GPR_CALLTYPE grpcsharp_test_nop(void *ptr) { return ptr; }
GPR_EXPORT int32_t GPR_CALLTYPE grpcsharp_sizeof_grpc_event(void) {
return sizeof(grpc_event);
}
+
+/* Override a method for testing */
+GPR_EXPORT void GPR_CALLTYPE
+grpcsharp_test_override_method(const char *method_name, const char *variant) {
+ if (strcmp("grpcsharp_call_start_batch", method_name) == 0) {
+ if (strcmp("nop", variant) == 0) {
+ g_call_start_batch_func = grpcsharp_call_start_batch_nop;
+ } else {
+ GPR_ASSERT(0);
+ }
+ } else {
+ GPR_ASSERT(0);
+ }
+}
diff --git a/src/node/README.md b/src/node/README.md
index 4b906643bc..3b98b97879 100644
--- a/src/node/README.md
+++ b/src/node/README.md
@@ -2,9 +2,9 @@
# Node.js gRPC Library
## PREREQUISITES
-- `node`: This requires `node` to be installed, version `0.12` or above. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package.
+- `node`: This requires `node` to be installed, version `4.0` or above. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package.
-- **Note:** If you installed `node` via a package manager and the version is still less than `0.12`, try directly installing it from [nodejs.org](https://nodejs.org).
+- **Note:** If you installed `node` via a package manager and the version is still less than `4.0`, try directly installing it from [nodejs.org](https://nodejs.org).
## INSTALLATION
@@ -16,7 +16,7 @@ npm install grpc
## BUILD FROM SOURCE
1. Clone [the grpc Git Repository](https://github.com/grpc/grpc).
- 2. Run `npm install` from the repository root.
+ 2. Run `npm install --build-from-source` from the repository root.
- **Note:** On Windows, this might fail due to [nodejs issue #4932](https://github.com/nodejs/node/issues/4932) in which case, you will see something like the following in `npm install`'s output (towards the very beginning):
@@ -34,61 +34,3 @@ npm install grpc
## TESTING
To run the test suite, simply run `npm test` in the install location.
-
-## API
-This library internally uses [ProtoBuf.js](https://github.com/dcodeIO/ProtoBuf.js), and some structures it exports match those exported by that library.
-
-If you require this module, you will get an object with the following members
-
-```javascript
-function load(filename)
-```
-
-Takes a filename of a [Protocol Buffer](https://developers.google.com/protocol-buffers/) file, and returns an object representing the structure of the protocol buffer in the following way:
-
- - Namespaces become maps from the names of their direct members to those member objects
- - Service definitions become client constructors for clients for that service. They also have a `service` member that can be used for constructing servers.
- - Message definitions become Message constructors like those that ProtoBuf.js would create
- - Enum definitions become Enum objects like those that ProtoBuf.js would create
- - Anything else becomes the relevant reflection object that ProtoBuf.js would create
-
-
-```javascript
-function loadObject(reflectionObject)
-```
-
-Returns the same structure that `load` returns, but takes a reflection object from `ProtoBuf.js` instead of a file name.
-
-```javascript
-function Server([serverOptions])
-```
-
-Constructs a server to which service/implementation pairs can be added.
-
-
-```javascript
-status
-```
-
-An object mapping status names to status code numbers.
-
-
-```javascript
-callError
-```
-
-An object mapping call error names to codes. This is primarily useful for tracking down certain kinds of internal errors.
-
-
-```javascript
-Credentials
-```
-
-An object with factory methods for creating credential objects for clients.
-
-
-```javascript
-ServerCredentials
-```
-
-An object with factory methods for creating credential objects for servers.
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 49179ab359..9453000ad3 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -508,9 +508,14 @@ void Call::DestroyCall() {
}
Call::Call(grpc_call *call)
- : wrapped_call(call), pending_batches(0), has_final_op_completed(false) {}
+ : wrapped_call(call), pending_batches(0), has_final_op_completed(false) {
+ peer = grpc_call_get_peer(call);
+}
-Call::~Call() { DestroyCall(); }
+Call::~Call() {
+ DestroyCall();
+ gpr_free(peer);
+}
void Call::Init(Local<Object> exports) {
HandleScope scope;
@@ -662,6 +667,16 @@ NAN_METHOD(Call::StartBatch) {
}
Local<Function> callback_func = info[1].As<Function>();
Call *call = ObjectWrap::Unwrap<Call>(info.This());
+ if (call->wrapped_call == NULL) {
+ /* This implies that the call has completed and has been destroyed. To
+ * emulate
+ * previous behavior, we should call the callback immediately with an error,
+ * as though the batch had failed in core */
+ Local<Value> argv[] = {
+ Nan::Error("The async function failed because the call has completed")};
+ Nan::Call(callback_func, Nan::New<Object>(), 1, argv);
+ return;
+ }
Local<Object> obj = Nan::To<Object>(info[0]).ToLocalChecked();
Local<Array> keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked();
size_t nops = keys->Length();
@@ -727,6 +742,11 @@ NAN_METHOD(Call::Cancel) {
return Nan::ThrowTypeError("cancel can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
+ if (call->wrapped_call == NULL) {
+ /* Cancel is supposed to be idempotent. If the call has already finished,
+ * cancel should just complete silently */
+ return;
+ }
grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("cancel failed", error));
@@ -747,6 +767,11 @@ NAN_METHOD(Call::CancelWithStatus) {
"cancelWithStatus's second argument must be a string");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
+ if (call->wrapped_call == NULL) {
+ /* Cancel is supposed to be idempotent. If the call has already finished,
+ * cancel should just complete silently */
+ return;
+ }
grpc_status_code code =
static_cast<grpc_status_code>(Nan::To<uint32_t>(info[0]).FromJust());
if (code == GRPC_STATUS_OK) {
@@ -763,9 +788,7 @@ NAN_METHOD(Call::GetPeer) {
return Nan::ThrowTypeError("getPeer can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
- char *peer = grpc_call_get_peer(call->wrapped_call);
- Local<Value> peer_value = Nan::New(peer).ToLocalChecked();
- gpr_free(peer);
+ Local<Value> peer_value = Nan::New(call->peer).ToLocalChecked();
info.GetReturnValue().Set(peer_value);
}
@@ -780,6 +803,10 @@ NAN_METHOD(Call::SetCredentials) {
"setCredentials' first argument must be a CallCredentials");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
+ if (call->wrapped_call == NULL) {
+ return Nan::ThrowError(
+ "Cannot set credentials on a call that has already started");
+ }
CallCredentials *creds_object = ObjectWrap::Unwrap<CallCredentials>(
Nan::To<Object>(info[0]).ToLocalChecked());
grpc_call_credentials *creds = creds_object->GetWrappedCredentials();
diff --git a/src/node/ext/call.h b/src/node/ext/call.h
index 0bd24f56a9..8f751279e4 100644
--- a/src/node/ext/call.h
+++ b/src/node/ext/call.h
@@ -96,6 +96,7 @@ class Call : public Nan::ObjectWrap {
call, this is GRPC_OP_RECV_STATUS_ON_CLIENT and for a server call, this
is GRPC_OP_SEND_STATUS_FROM_SERVER */
bool has_final_op_completed;
+ char *peer;
};
class Op {
diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json
index 37c9b7a54f..0922f54a39 100644
--- a/src/node/health_check/package.json
+++ b/src/node/health_check/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc-health-check",
- "version": "1.4.0-dev",
+ "version": "1.5.0-dev",
"author": "Google Inc.",
"description": "Health check service for use with gRPC",
"repository": {
@@ -15,7 +15,7 @@
}
],
"dependencies": {
- "grpc": "^1.4.0-dev",
+ "grpc": "^1.5.0-dev",
"lodash": "^3.9.3",
"google-protobuf": "^3.0.0"
},
diff --git a/src/node/index.js b/src/node/index.js
index 2da77c3eae..177628e22d 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2015, Google Inc.
* All rights reserved.
*
@@ -31,10 +31,6 @@
*
*/
-/**
- * @module
- */
-
'use strict';
var path = require('path');
@@ -64,24 +60,30 @@ var constants = require('./src/constants.js');
grpc.setDefaultRootsPem(fs.readFileSync(SSL_ROOTS_PATH, 'ascii'));
/**
- * Load a ProtoBuf.js object as a gRPC object. The options object can provide
- * the following options:
- * - binaryAsBase64: deserialize bytes values as base64 strings instead of
- * Buffers. Defaults to false
- * - longsAsStrings: deserialize long values as strings instead of objects.
- * Defaults to true
- * - deprecatedArgumentOrder: Use the beta method argument order for client
- * methods, with optional arguments after the callback. Defaults to false.
- * This option is only a temporary stopgap measure to smooth an API breakage.
- * It is deprecated, and new code should not use it.
- * - protobufjsVersion: Available values are 5, 6, and 'detect'. 5 and 6
- * respectively indicate that an object from the corresponding version of
- * ProtoBuf.js is provided in the value argument. If the option is 'detect',
- * gRPC will guess what the version is based on the structure of the value.
- * Defaults to 'detect'.
+ * @namespace grpc
+ */
+
+/**
+ * Load a ProtoBuf.js object as a gRPC object.
+ * @memberof grpc
+ * @alias grpc.loadObject
* @param {Object} value The ProtoBuf.js reflection object to load
* @param {Object=} options Options to apply to the loaded file
- * @return {Object<string, *>} The resulting gRPC object
+ * @param {bool=} [options.binaryAsBase64=false] deserialize bytes values as
+ * base64 strings instead of Buffers
+ * @param {bool=} [options.longsAsStrings=true] deserialize long values as
+ * strings instead of objects
+ * @param {bool=} [options.enumsAsStrings=true] deserialize enum values as
+ * strings instead of numbers. Only works with Protobuf.js 6 values.
+ * @param {bool=} [options.deprecatedArgumentOrder=false] use the beta method
+ * argument order for client methods, with optional arguments after the
+ * callback. This option is only a temporary stopgap measure to smooth an
+ * API breakage. It is deprecated, and new code should not use it.
+ * @param {(number|string)=} [options.protobufjsVersion='detect'] 5 and 6
+ * respectively indicate that an object from the corresponding version of
+ * Protobuf.js is provided in the value argument. If the option is 'detect',
+ * gRPC wll guess what the version is based on the structure of the value.
+ * @return {Object<string, *>} The resulting gRPC object.
*/
exports.loadObject = function loadObject(value, options) {
options = _.defaults(options, common.defaultGrpcOptions);
@@ -112,22 +114,23 @@ exports.loadObject = function loadObject(value, options) {
var loadObject = exports.loadObject;
/**
- * Load a gRPC object from a .proto file. The options object can provide the
- * following options:
- * - convertFieldsToCamelCase: Load this file with field names in camel case
- * instead of their original case
- * - binaryAsBase64: deserialize bytes values as base64 strings instead of
- * Buffers. Defaults to false
- * - longsAsStrings: deserialize long values as strings instead of objects.
- * Defaults to true
- * - deprecatedArgumentOrder: Use the beta method argument order for client
- * methods, with optional arguments after the callback. Defaults to false.
- * This option is only a temporary stopgap measure to smooth an API breakage.
- * It is deprecated, and new code should not use it.
+ * Load a gRPC object from a .proto file.
+ * @memberof grpc
+ * @alias grpc.load
* @param {string|{root: string, file: string}} filename The file to load
* @param {string=} format The file format to expect. Must be either 'proto' or
* 'json'. Defaults to 'proto'
* @param {Object=} options Options to apply to the loaded file
+ * @param {bool=} [options.convertFieldsToCamelCase=false] Load this file with
+ * field names in camel case instead of their original case
+ * @param {bool=} [options.binaryAsBase64=false] deserialize bytes values as
+ * base64 strings instead of Buffers
+ * @param {bool=} [options.longsAsStrings=true] deserialize long values as
+ * strings instead of objects
+ * @param {bool=} [options.deprecatedArgumentOrder=false] use the beta method
+ * argument order for client methods, with optional arguments after the
+ * callback. This option is only a temporary stopgap measure to smooth an
+ * API breakage. It is deprecated, and new code should not use it.
* @return {Object<string, *>} The resulting gRPC object
*/
exports.load = function load(filename, format, options) {
@@ -168,6 +171,8 @@ var log_template = _.template(
* called. Note: the output format here is intended to be informational, and
* is not guaranteed to stay the same in the future.
* Logs will be directed to logger.error.
+ * @memberof grpc
+ * @alias grpc.setLogger
* @param {Console} logger A Console-like object.
*/
exports.setLogger = function setLogger(logger) {
@@ -187,6 +192,8 @@ exports.setLogger = function setLogger(logger) {
/**
* Sets the logger verbosity for gRPC module logging. The options are members
* of the grpc.logVerbosity map.
+ * @memberof grpc
+ * @alias grpc.setLogVerbosity
* @param {Number} verbosity The minimum severity to log
*/
exports.setLogVerbosity = function setLogVerbosity(verbosity) {
@@ -194,71 +201,70 @@ exports.setLogVerbosity = function setLogVerbosity(verbosity) {
grpc.setLogVerbosity(verbosity);
};
-/**
- * @see module:src/server.Server
- */
exports.Server = server.Server;
-/**
- * @see module:src/metadata
- */
exports.Metadata = Metadata;
-/**
- * Status name to code number mapping
- */
exports.status = constants.status;
-/**
- * Propagate flag name to number mapping
- */
exports.propagate = constants.propagate;
-/**
- * Call error name to code number mapping
- */
exports.callError = constants.callError;
-/**
- * Write flag name to code number mapping
- */
exports.writeFlags = constants.writeFlags;
-/**
- * Log verbosity setting name to code number mapping
- */
exports.logVerbosity = constants.logVerbosity;
-/**
- * Credentials factories
- */
exports.credentials = require('./src/credentials.js');
/**
* ServerCredentials factories
+ * @constructor ServerCredentials
+ * @memberof grpc
*/
exports.ServerCredentials = grpc.ServerCredentials;
/**
- * @see module:src/client.makeClientConstructor
+ * Create insecure server credentials
+ * @name grpc.ServerCredentials.createInsecure
+ * @kind function
+ * @return grpc.ServerCredentials
*/
-exports.makeGenericClientConstructor = client.makeClientConstructor;
/**
- * @see module:src/client.getClientChannel
+ * A private key and certificate pair
+ * @typedef {Object} grpc.ServerCredentials~keyCertPair
+ * @property {Buffer} privateKey The server's private key
+ * @property {Buffer} certChain The server's certificate chain
*/
-exports.getClientChannel = client.getClientChannel;
/**
- * @see module:src/client.waitForClientReady
+ * Create SSL server credentials
+ * @name grpc.ServerCredentials.createInsecure
+ * @kind function
+ * @param {?Buffer} rootCerts Root CA certificates for validating client
+ * certificates
+ * @param {Array<grpc.ServerCredentials~keyCertPair>} keyCertPairs A list of
+ * private key and certificate chain pairs to be used for authenticating
+ * the server
+ * @param {boolean} [checkClientCertificate=false] Indicates that the server
+ * should request and verify the client's certificates
+ * @return grpc.ServerCredentials
*/
+
+exports.makeGenericClientConstructor = client.makeClientConstructor;
+
+exports.getClientChannel = client.getClientChannel;
+
exports.waitForClientReady = client.waitForClientReady;
+/**
+ * @memberof grpc
+ * @alias grpc.closeClient
+ * @param {grpc.Client} client_obj The client to close
+ */
exports.closeClient = function closeClient(client_obj) {
client.Client.prototype.close.apply(client_obj);
};
-/**
- * @see module:src/client.Client
- */
exports.Client = client.Client;
diff --git a/src/node/performance/benchmark_client.js b/src/node/performance/benchmark_client.js
index e7c426b2ff..e0e68ffdef 100644
--- a/src/node/performance/benchmark_client.js
+++ b/src/node/performance/benchmark_client.js
@@ -227,18 +227,22 @@ BenchmarkClient.prototype.startClosedLoop = function(
makeCall = function(client) {
if (self.running) {
self.pending_calls++;
- var start_time = process.hrtime();
var call = client.streamingCall();
+ var start_time = process.hrtime();
call.write(argument);
call.on('data', function() {
- });
- call.on('end', function() {
var time_diff = process.hrtime(start_time);
self.histogram.add(timeDiffToNanos(time_diff));
- makeCall(client);
self.pending_calls--;
- if ((!self.running) && self.pending_calls == 0) {
- self.emit('finished');
+ if (self.running) {
+ self.pending_calls++;
+ start_time = process.hrtime();
+ call.write(argument);
+ } else {
+ call.end();
+ if (self.pending_calls == 0) {
+ self.emit('finished');
+ }
}
});
call.on('error', function(error) {
@@ -317,30 +321,8 @@ BenchmarkClient.prototype.startPoisson = function(
}
};
} else {
- makeCall = function(client, poisson) {
- if (self.running) {
- self.pending_calls++;
- var start_time = process.hrtime();
- var call = client.streamingCall();
- call.write(argument);
- call.on('data', function() {
- });
- call.on('end', function() {
- var time_diff = process.hrtime(start_time);
- self.histogram.add(timeDiffToNanos(time_diff));
- self.pending_calls--;
- if ((!self.running) && self.pending_calls == 0) {
- self.emit('finished');
- }
- });
- call.on('error', function(error) {
- self.emit('error', new Error('Client error: ' + error.message));
- self.running = false;
- });
- } else {
- poisson.stop();
- }
- };
+ self.emit('error', new Error('Streaming Poisson benchmarks not supported'));
+ return;
}
var averageIntervalMs = (1 / offered_load) * 1000;
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 16fe06a54d..f59ac5c94c 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2015, Google Inc.
* All rights reserved.
*
@@ -43,8 +43,6 @@
* var Client = proto_obj.package.subpackage.ServiceName;
* var client = new Client(server_address, client_credentials);
* var call = client.unaryMethod(arguments, callback);
- *
- * @module
*/
'use strict';
@@ -70,13 +68,26 @@ var Duplex = stream.Duplex;
var util = require('util');
var version = require('../../../package.json').version;
+/**
+ * Initial response metadata sent by the server when it starts processing the
+ * call
+ * @event grpc~ClientUnaryCall#metadata
+ * @type {grpc.Metadata}
+ */
+
+/**
+ * Status of the call when it has completed.
+ * @event grpc~ClientUnaryCall#status
+ * @type grpc~StatusObject
+ */
+
util.inherits(ClientUnaryCall, EventEmitter);
/**
- * An EventEmitter. Used for unary calls
- * @constructor
+ * An EventEmitter. Used for unary calls.
+ * @constructor grpc~ClientUnaryCall
* @extends external:EventEmitter
- * @param {grpc.Call} call The call object associated with the request
+ * @param {grpc.internal~Call} call The call object associated with the request
*/
function ClientUnaryCall(call) {
EventEmitter.call(this);
@@ -88,14 +99,16 @@ util.inherits(ClientWritableStream, Writable);
/**
* A stream that the client can write to. Used for calls that are streaming from
* the client side.
- * @constructor
+ * @constructor grpc~ClientWritableStream
* @extends external:Writable
- * @borrows module:src/client~ClientUnaryCall#cancel as
- * module:src/client~ClientWritableStream#cancel
- * @borrows module:src/client~ClientUnaryCall#getPeer as
- * module:src/client~ClientWritableStream#getPeer
- * @param {grpc.Call} call The call object to send data with
- * @param {module:src/common~serialize=} [serialize=identity] Serialization
+ * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientWritableStream#cancel
+ * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientWritableStream#getPeer
+ * @borrows grpc~ClientUnaryCall#event:metadata as
+ * grpc~ClientWritableStream#metadata
+ * @borrows grpc~ClientUnaryCall#event:status as
+ * grpc~ClientWritableStream#status
+ * @param {grpc.internal~Call} call The call object to send data with
+ * @param {grpc~serialize=} [serialize=identity] Serialization
* function for writes.
*/
function ClientWritableStream(call, serialize) {
@@ -110,17 +123,36 @@ function ClientWritableStream(call, serialize) {
}
/**
+ * Write a message to the request stream. If serializing the argument fails,
+ * the call will be cancelled and the stream will end with an error.
+ * @name grpc~ClientWritableStream#write
+ * @kind function
+ * @override
+ * @param {*} message The message to write. Must be a valid argument to the
+ * serialize function of the corresponding method
+ * @param {grpc.writeFlags} flags Flags to modify how the message is written
+ * @param {Function} callback Callback for when this chunk of data is flushed
+ * @return {boolean} As defined for [Writable]{@link external:Writable}
+ */
+
+/**
* Attempt to write the given chunk. Calls the callback when done. This is an
* implementation of a method needed for implementing stream.Writable.
- * @access private
- * @param {Buffer} chunk The chunk to write
- * @param {string} encoding Used to pass write flags
+ * @private
+ * @param {*} chunk The chunk to write
+ * @param {grpc.writeFlags} encoding Used to pass write flags
* @param {function(Error=)} callback Called when the write is complete
*/
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
var message;
+ var self = this;
+ if (this.writeFailed) {
+ /* Once a write fails, just call the callback immediately to let the caller
+ flush any pending writes. */
+ setImmediate(callback);
+ }
try {
message = this.serialize(chunk);
} catch (e) {
@@ -141,8 +173,10 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, event) {
if (err) {
- // Something has gone wrong. Stop writing by failing to call callback
- return;
+ /* Assume that the call is complete and that writing failed because a
+ status was received. In that case, set a flag to discard all future
+ writes */
+ self.writeFailed = true;
}
callback();
});
@@ -155,14 +189,16 @@ util.inherits(ClientReadableStream, Readable);
/**
* A stream that the client can read from. Used for calls that are streaming
* from the server side.
- * @constructor
+ * @constructor grpc~ClientReadableStream
* @extends external:Readable
- * @borrows module:src/client~ClientUnaryCall#cancel as
- * module:src/client~ClientReadableStream#cancel
- * @borrows module:src/client~ClientUnaryCall#getPeer as
- * module:src/client~ClientReadableStream#getPeer
- * @param {grpc.Call} call The call object to read data with
- * @param {module:src/common~deserialize=} [deserialize=identity]
+ * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientReadableStream#cancel
+ * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientReadableStream#getPeer
+ * @borrows grpc~ClientUnaryCall#event:metadata as
+ * grpc~ClientReadableStream#metadata
+ * @borrows grpc~ClientUnaryCall#event:status as
+ * grpc~ClientReadableStream#status
+ * @param {grpc.internal~Call} call The call object to read data with
+ * @param {grpc~deserialize=} [deserialize=identity]
* Deserialization function for reads
*/
function ClientReadableStream(call, deserialize) {
@@ -183,7 +219,7 @@ function ClientReadableStream(call, deserialize) {
* parameter indicates that the call should end with that status. status
* defaults to OK if not provided.
* @param {Object!} status The status that the call should end with
- * @access private
+ * @private
*/
function _readsDone(status) {
/* jshint validthis: true */
@@ -202,7 +238,7 @@ ClientReadableStream.prototype._readsDone = _readsDone;
/**
* Called to indicate that we have received a status from the server.
- * @access private
+ * @private
*/
function _receiveStatus(status) {
/* jshint validthis: true */
@@ -215,7 +251,7 @@ ClientReadableStream.prototype._receiveStatus = _receiveStatus;
/**
* If we have both processed all incoming messages and received the status from
* the server, emit the status. Otherwise, do nothing.
- * @access private
+ * @private
*/
function _emitStatusIfDone() {
/* jshint validthis: true */
@@ -242,7 +278,7 @@ ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
/**
* Read the next object from the stream.
- * @access private
+ * @private
* @param {*} size Ignored because we use objectMode=true
*/
function _read(size) {
@@ -300,16 +336,19 @@ util.inherits(ClientDuplexStream, Duplex);
/**
* A stream that the client can read from or write to. Used for calls with
* duplex streaming.
- * @constructor
+ * @constructor grpc~ClientDuplexStream
* @extends external:Duplex
- * @borrows module:src/client~ClientUnaryCall#cancel as
- * module:src/client~ClientDuplexStream#cancel
- * @borrows module:src/client~ClientUnaryCall#getPeer as
- * module:src/client~ClientDuplexStream#getPeer
- * @param {grpc.Call} call Call object to proxy
- * @param {module:src/common~serialize=} [serialize=identity] Serialization
+ * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientDuplexStream#cancel
+ * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientDuplexStream#getPeer
+ * @borrows grpc~ClientWritableStream#write as grpc~ClientDuplexStream#write
+ * @borrows grpc~ClientUnaryCall#event:metadata as
+ * grpc~ClientDuplexStream#metadata
+ * @borrows grpc~ClientUnaryCall#event:status as
+ * grpc~ClientDuplexStream#status
+ * @param {grpc.internal~Call} call Call object to proxy
+ * @param {grpc~serialize=} [serialize=identity] Serialization
* function for requests
- * @param {module:src/common~deserialize=} [deserialize=identity]
+ * @param {grpc~deserialize=} [deserialize=identity]
* Deserialization function for responses
*/
function ClientDuplexStream(call, serialize, deserialize) {
@@ -336,8 +375,9 @@ ClientDuplexStream.prototype._read = _read;
ClientDuplexStream.prototype._write = _write;
/**
- * Cancel the ongoing call
- * @alias module:src/client~ClientUnaryCall#cancel
+ * Cancel the ongoing call. Results in the call ending with a CANCELLED status,
+ * unless it has already ended with some other status.
+ * @alias grpc~ClientUnaryCall#cancel
*/
function cancel() {
/* jshint validthis: true */
@@ -352,7 +392,7 @@ ClientDuplexStream.prototype.cancel = cancel;
/**
* Get the endpoint this call/stream is connected to.
* @return {string} The URI of the endpoint
- * @alias module:src/client~ClientUnaryCall#getPeer
+ * @alias grpc~ClientUnaryCall#getPeer
*/
function getPeer() {
/* jshint validthis: true */
@@ -368,33 +408,31 @@ ClientDuplexStream.prototype.getPeer = getPeer;
* Any client call type
* @typedef {(ClientUnaryCall|ClientReadableStream|
* ClientWritableStream|ClientDuplexStream)}
- * module:src/client~Call
+ * grpc.Client~Call
*/
/**
* Options that can be set on a call.
- * @typedef {Object} module:src/client~CallOptions
- * @property {(date|number)} deadline The deadline for the entire call to
- * complete. A value of Infinity indicates that no deadline should be set.
- * @property {(string)} host Server hostname to set on the call. Only meaningful
+ * @typedef {Object} grpc.Client~CallOptions
+ * @property {grpc~Deadline} deadline The deadline for the entire call to
+ * complete.
+ * @property {string} host Server hostname to set on the call. Only meaningful
* if different from the server address used to construct the client.
- * @property {module:src/client~Call} parent Parent call. Used in servers when
+ * @property {grpc.Client~Call} parent Parent call. Used in servers when
* making a call as part of the process of handling a call. Used to
* propagate some information automatically, as specified by
* propagate_flags.
* @property {number} propagate_flags Indicates which properties of a parent
* call should propagate to this call. Bitwise combination of flags in
- * [grpc.propagate]{@link module:index.propagate}.
- * @property {module:src/credentials~CallCredentials} credentials The
- * credentials that should be used to make this particular call.
+ * {@link grpc.propagate}.
+ * @property {grpc.credentials~CallCredentials} credentials The credentials that
+ * should be used to make this particular call.
*/
/**
- * Get a call object built with the provided options. Keys for options are
- * 'deadline', which takes a date or number, and 'host', which takes a string
- * and overrides the hostname to connect to.
+ * Get a call object built with the provided options.
* @access private
- * @param {module:src/client~CallOptions=} options Options object.
+ * @param {grpc.Client~CallOptions=} options Options object.
*/
function getCall(channel, method, options) {
var deadline;
@@ -422,14 +460,14 @@ function getCall(channel, method, options) {
/**
* A generic gRPC client. Primarily useful as a base class for generated clients
- * @alias module:src/client.Client
+ * @memberof grpc
* @constructor
* @param {string} address Server address to connect to
- * @param {module:src/credentials~ChannelCredentials} credentials Credentials to
- * use to connect to the server
+ * @param {grpc~ChannelCredentials} credentials Credentials to use to connect to
+ * the server
* @param {Object} options Options to apply to channel creation
*/
-var Client = exports.Client = function Client(address, credentials, options) {
+function Client(address, credentials, options) {
if (!options) {
options = {};
}
@@ -445,19 +483,13 @@ var Client = exports.Client = function Client(address, credentials, options) {
/* Private fields use $ as a prefix instead of _ because it is an invalid
* prefix of a method name */
this.$channel = new grpc.Channel(address, credentials, options);
-};
+}
-/**
- * @typedef {Error} module:src/client.Client~ServiceError
- * @property {number} code The error code, a key of
- * [grpc.status]{@link module:src/client.status}
- * @property {module:metadata.Metadata} metadata Metadata sent with the status
- * by the server, if any
- */
+exports.Client = Client;
/**
- * @callback module:src/client.Client~requestCallback
- * @param {?module:src/client.Client~ServiceError} error The error, if the call
+ * @callback grpc.Client~requestCallback
+ * @param {?grpc~ServiceError} error The error, if the call
* failed
* @param {*} value The response value, if the call succeeded
*/
@@ -466,17 +498,17 @@ var Client = exports.Client = function Client(address, credentials, options) {
* Make a unary request to the given method, using the given serialize
* and deserialize functions, with the given argument.
* @param {string} method The name of the method to request
- * @param {module:src/common~serialize} serialize The serialization function for
+ * @param {grpc~serialize} serialize The serialization function for
* inputs
- * @param {module:src/common~deserialize} deserialize The deserialization
+ * @param {grpc~deserialize} deserialize The deserialization
* function for outputs
* @param {*} argument The argument to the call. Should be serializable with
* serialize
- * @param {module:src/metadata.Metadata=} metadata Metadata to add to the call
- * @param {module:src/client~CallOptions=} options Options map
- * @param {module:src/client.Client~requestCallback} callback The callback to
+ * @param {grpc.Metadata=} metadata Metadata to add to the call
+ * @param {grpc.Client~CallOptions=} options Options map
+ * @param {grpc.Client~requestCallback} callback The callback to
* for when the response is received
- * @return {EventEmitter} An event emitter for stream related events
+ * @return {grpc~ClientUnaryCall} An event emitter for stream related events
*/
Client.prototype.makeUnaryRequest = function(method, serialize, deserialize,
argument, metadata, options,
@@ -548,17 +580,17 @@ Client.prototype.makeUnaryRequest = function(method, serialize, deserialize,
* Make a client stream request to the given method, using the given serialize
* and deserialize functions, with the given argument.
* @param {string} method The name of the method to request
- * @param {module:src/common~serialize} serialize The serialization function for
+ * @param {grpc~serialize} serialize The serialization function for
* inputs
- * @param {module:src/common~deserialize} deserialize The deserialization
+ * @param {grpc~deserialize} deserialize The deserialization
* function for outputs
- * @param {module:src/metadata.Metadata=} metadata Array of metadata key/value
- * pairs to add to the call
- * @param {module:src/client~CallOptions=} options Options map
- * @param {Client~requestCallback} callback The callback to for when the
+ * @param {grpc.Metadata=} metadata Array of metadata key/value pairs to add to
+ * the call
+ * @param {grpc.Client~CallOptions=} options Options map
+ * @param {grpc.Client~requestCallback} callback The callback to for when the
* response is received
- * @return {module:src/client~ClientWritableStream} An event emitter for stream
- * related events
+ * @return {grpc~ClientWritableStream} An event emitter for stream related
+ * events
*/
Client.prototype.makeClientStreamRequest = function(method, serialize,
deserialize, metadata,
@@ -631,17 +663,16 @@ Client.prototype.makeClientStreamRequest = function(method, serialize,
* Make a server stream request to the given method, with the given serialize
* and deserialize function, using the given argument
* @param {string} method The name of the method to request
- * @param {module:src/common~serialize} serialize The serialization function for
- * inputs
- * @param {module:src/common~deserialize} deserialize The deserialization
+ * @param {grpc~serialize} serialize The serialization function for inputs
+ * @param {grpc~deserialize} deserialize The deserialization
* function for outputs
* @param {*} argument The argument to the call. Should be serializable with
* serialize
- * @param {module:src/metadata.Metadata=} metadata Array of metadata key/value
- * pairs to add to the call
- * @param {module:src/client~CallOptions=} options Options map
- * @return {module:src/client~ClientReadableStream} An event emitter for stream
- * related events
+ * @param {grpc.Metadata=} metadata Array of metadata key/value pairs to add to
+ * the call
+ * @param {grpc.Client~CallOptions=} options Options map
+ * @return {grpc~ClientReadableStream} An event emitter for stream related
+ * events
*/
Client.prototype.makeServerStreamRequest = function(method, serialize,
deserialize, argument,
@@ -693,15 +724,13 @@ Client.prototype.makeServerStreamRequest = function(method, serialize,
/**
* Make a bidirectional stream request with this method on the given channel.
* @param {string} method The name of the method to request
- * @param {module:src/common~serialize} serialize The serialization function for
- * inputs
- * @param {module:src/common~deserialize} deserialize The deserialization
+ * @param {grpc~serialize} serialize The serialization function for inputs
+ * @param {grpc~deserialize} deserialize The deserialization
* function for outputs
- * @param {module:src/metadata.Metadata=} metadata Array of metadata key/value
+ * @param {grpc.Metadata=} metadata Array of metadata key/value
* pairs to add to the call
- * @param {module:src/client~CallOptions=} options Options map
- * @return {module:src/client~ClientDuplexStream} An event emitter for stream
- * related events
+ * @param {grpc.Client~CallOptions=} options Options map
+ * @return {grpc~ClientDuplexStream} An event emitter for stream related events
*/
Client.prototype.makeBidiStreamRequest = function(method, serialize,
deserialize, metadata,
@@ -743,6 +772,9 @@ Client.prototype.makeBidiStreamRequest = function(method, serialize,
return stream;
};
+/**
+ * Close this client.
+ */
Client.prototype.close = function() {
this.$channel.close();
};
@@ -761,8 +793,7 @@ Client.prototype.getChannel = function() {
* with an error if the attempt to connect to the server has unrecoverablly
* failed or if the deadline expires. This function will make the channel
* start connecting if it has not already done so.
- * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass
- * Infinity to wait forever.
+ * @param {grpc~Deadline} deadline When to stop waiting for a connection.
* @param {function(Error)} callback The callback to call when done attempting
* to connect.
*/
@@ -788,7 +819,7 @@ Client.prototype.waitForReady = function(deadline, callback) {
/**
* Map with short names for each of the requester maker functions. Used in
* makeClientConstructor
- * @access private
+ * @private
*/
var requester_funcs = {
unary: Client.prototype.makeUnaryRequest,
@@ -834,9 +865,15 @@ var deprecated_request_wrap = {
/**
* Creates a constructor for a client with the given methods, as specified in
- * the methods argument.
- * @param {module:src/common~ServiceDefinition} methods An object mapping
- * method names to method attributes
+ * the methods argument. The resulting class will have an instance method for
+ * each method in the service, which is a partial application of one of the
+ * [Client]{@link grpc.Client} request methods, depending on `requestSerialize`
+ * and `responseSerialize`, with the `method`, `serialize`, and `deserialize`
+ * arguments predefined.
+ * @memberof grpc
+ * @alias grpc~makeGenericClientConstructor
+ * @param {grpc~ServiceDefinition} methods An object mapping method names to
+ * method attributes
* @param {string} serviceName The fully qualified name of the service
* @param {Object} class_options An options object.
* @param {boolean=} [class_options.deprecatedArgumentOrder=false] Indicates
@@ -844,9 +881,8 @@ var deprecated_request_wrap = {
* arguments at the end instead of the callback at the end. This option
* is only a temporary stopgap measure to smooth an API breakage.
* It is deprecated, and new code should not use it.
- * @return {function(string, Object)} New client constructor, which is a
- * subclass of [grpc.Client]{@link module:src/client.Client}, and has the
- * same arguments as that constructor.
+ * @return {function} New client constructor, which is a subclass of
+ * {@link grpc.Client}, and has the same arguments as that constructor.
*/
exports.makeClientConstructor = function(methods, serviceName,
class_options) {
@@ -898,8 +934,11 @@ exports.makeClientConstructor = function(methods, serviceName,
/**
* Return the underlying channel object for the specified client
+ * @memberof grpc
+ * @alias grpc~getClientChannel
* @param {Client} client
* @return {Channel} The channel
+ * @see grpc.Client#getChannel
*/
exports.getClientChannel = function(client) {
return Client.prototype.getChannel.call(client);
@@ -911,22 +950,15 @@ exports.getClientChannel = function(client) {
* with an error if the attempt to connect to the server has unrecoverablly
* failed or if the deadline expires. This function will make the channel
* start connecting if it has not already done so.
+ * @memberof grpc
+ * @alias grpc~waitForClientReady
* @param {Client} client The client to wait on
- * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass
+ * @param {grpc~Deadline} deadline When to stop waiting for a connection. Pass
* Infinity to wait forever.
* @param {function(Error)} callback The callback to call when done attempting
* to connect.
+ * @see grpc.Client#waitForReady
*/
exports.waitForClientReady = function(client, deadline, callback) {
Client.prototype.waitForReady.call(client, deadline, callback);
};
-
-/**
- * Map of status code names to status codes
- */
-exports.status = constants.status;
-
-/**
- * See docs for client.callError
- */
-exports.callError = grpc.callError;
diff --git a/src/node/src/common.js b/src/node/src/common.js
index 4dad60e630..0f835317ea 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2015, Google Inc.
* All rights reserved.
*
@@ -31,12 +31,6 @@
*
*/
-/**
- * This module contains functions that are common to client and server
- * code. None of them should be used directly by gRPC users.
- * @module
- */
-
'use strict';
var _ = require('lodash');
@@ -62,16 +56,19 @@ exports.wrapIgnoreNull = function wrapIgnoreNull(func) {
/**
* The logger object for the gRPC module. Defaults to console.
+ * @private
*/
exports.logger = console;
/**
* The current logging verbosity. 0 corresponds to logging everything
+ * @private
*/
exports.logVerbosity = 0;
/**
* Log a message if the severity is at least as high as the current verbosity
+ * @private
* @param {Number} severity A value of the grpc.logVerbosity map
* @param {String} message The message to log
*/
@@ -83,6 +80,7 @@ exports.log = function log(severity, message) {
/**
* Default options for loading proto files into gRPC
+ * @alias grpc~defaultLoadOptions
*/
exports.defaultGrpcOptions = {
convertFieldsToCamelCase: false,
@@ -95,6 +93,30 @@ exports.defaultGrpcOptions = {
// JSDoc definitions that are used in multiple other modules
/**
+ * Represents the status of a completed request. If `code` is
+ * {@link grpc.status}.OK, then the request has completed successfully.
+ * Otherwise, the request has failed, `details` will contain a description of
+ * the error. Either way, `metadata` contains the trailing response metadata
+ * sent by the server when it finishes processing the call.
+ * @typedef {object} grpc~StatusObject
+ * @property {number} code The error code, a key of {@link grpc.status}
+ * @property {string} details Human-readable description of the status
+ * @property {grpc.Metadata} metadata Trailing metadata sent with the status,
+ * if applicable
+ */
+
+/**
+ * Describes how a request has failed. The member `message` will be the same as
+ * `details` in {@link grpc~StatusObject}, and `code` and `metadata` are the
+ * same as in that object.
+ * @typedef {Error} grpc~ServiceError
+ * @property {number} code The error code, a key of {@link grpc.status} that is
+ * not `grpc.status.OK`
+ * @property {grpc.Metadata} metadata Trailing metadata sent with the status,
+ * if applicable
+ */
+
+/**
* The EventEmitter class in the event standard module
* @external EventEmitter
* @see https://nodejs.org/api/events.html#events_class_eventemitter
@@ -120,38 +142,46 @@ exports.defaultGrpcOptions = {
/**
* A serialization function
- * @callback module:src/common~serialize
+ * @callback grpc~serialize
* @param {*} value The value to serialize
* @return {Buffer} The value serialized as a byte sequence
*/
/**
* A deserialization function
- * @callback module:src/common~deserialize
+ * @callback grpc~deserialize
* @param {Buffer} data The byte sequence to deserialize
* @return {*} The data deserialized as a value
*/
/**
+ * The deadline of an operation. If it is a date, the deadline is reached at
+ * the date and time specified. If it is a finite number, it is treated as
+ * a number of milliseconds since the Unix Epoch. If it is Infinity, the
+ * deadline will never be reached. If it is -Infinity, the deadline has already
+ * passed.
+ * @typedef {(number|date)} grpc~Deadline
+ */
+
+/**
* An object that completely defines a service method signature.
- * @typedef {Object} module:src/common~MethodDefinition
+ * @typedef {Object} grpc~MethodDefinition
* @property {string} path The method's URL path
* @property {boolean} requestStream Indicates whether the method accepts
* a stream of requests
* @property {boolean} responseStream Indicates whether the method returns
* a stream of responses
- * @property {module:src/common~serialize} requestSerialize Serialization
+ * @property {grpc~serialize} requestSerialize Serialization
* function for request values
- * @property {module:src/common~serialize} responseSerialize Serialization
+ * @property {grpc~serialize} responseSerialize Serialization
* function for response values
- * @property {module:src/common~deserialize} requestDeserialize Deserialization
+ * @property {grpc~deserialize} requestDeserialize Deserialization
* function for request data
- * @property {module:src/common~deserialize} responseDeserialize Deserialization
+ * @property {grpc~deserialize} responseDeserialize Deserialization
* function for repsonse data
*/
/**
* An object that completely defines a service.
- * @typedef {Object.<string, module:src/common~MethodDefinition>}
- * module:src/common~ServiceDefinition
+ * @typedef {Object.<string, grpc~MethodDefinition>} grpc~ServiceDefinition
*/
diff --git a/src/node/src/constants.js b/src/node/src/constants.js
index 528dab120e..c441ee740b 100644
--- a/src/node/src/constants.js
+++ b/src/node/src/constants.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2017, Google Inc.
* All rights reserved.
*
@@ -31,16 +31,14 @@
*
*/
-/**
- * @module
- */
-
/* The comments about status codes are copied verbatim (with some formatting
* modifications) from include/grpc/impl/codegen/status.h, for the purpose of
* including them in generated documentation.
*/
/**
* Enum of status codes that gRPC can return
+ * @memberof grpc
+ * @alias grpc.status
* @readonly
* @enum {number}
*/
@@ -178,6 +176,8 @@ exports.status = {
* Users are encouraged to write propagation masks as deltas from the default.
* i.e. write `grpc.propagate.DEFAULTS & ~grpc.propagate.DEADLINE` to disable
* deadline propagation.
+ * @memberof grpc
+ * @alias grpc.propagate
* @enum {number}
*/
exports.propagate = {
@@ -194,9 +194,11 @@ exports.propagate = {
/**
* Call error constants. Call errors almost always indicate bugs in the gRPC
* library, and these error codes are mainly useful for finding those bugs.
+ * @memberof grpc
+ * @readonly
* @enum {number}
*/
-exports.callError = {
+const callError = {
OK: 0,
ERROR: 1,
NOT_ON_SERVER: 2,
@@ -213,9 +215,14 @@ exports.callError = {
PAYLOAD_TYPE_MISMATCH: 14
};
+exports.callError = callError;
+
/**
* Write flags: these can be bitwise or-ed to form write options that modify
* how data is written.
+ * @memberof grpc
+ * @alias grpc.writeFlags
+ * @readonly
* @enum {number}
*/
exports.writeFlags = {
@@ -232,6 +239,9 @@ exports.writeFlags = {
};
/**
+ * @memberof grpc
+ * @alias grpc.logVerbosity
+ * @readonly
* @enum {number}
*/
exports.logVerbosity = {
diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js
index b1e86bbd09..b1dbc1c450 100644
--- a/src/node/src/credentials.js
+++ b/src/node/src/credentials.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2015, Google Inc.
* All rights reserved.
*
@@ -48,6 +48,7 @@
* For example, to create a client secured with SSL that uses Google
* default application credentials to authenticate:
*
+ * @example
* var channel_creds = credentials.createSsl(root_certs);
* (new GoogleAuth()).getApplicationDefault(function(err, credential) {
* var call_creds = credentials.createFromGoogleCredential(credential);
@@ -56,15 +57,25 @@
* var client = new Client(address, combined_creds);
* });
*
- * @module
+ * @namespace grpc.credentials
*/
'use strict';
var grpc = require('./grpc_extension');
+/**
+ * This cannot be constructed directly. Instead, instances of this class should
+ * be created using the factory functions in {@link grpc.credentials}
+ * @constructor grpc.credentials~CallCredentials
+ */
var CallCredentials = grpc.CallCredentials;
+/**
+ * This cannot be constructed directly. Instead, instances of this class should
+ * be created using the factory functions in {@link grpc.credentials}
+ * @constructor grpc.credentials~ChannelCredentials
+ */
var ChannelCredentials = grpc.ChannelCredentials;
var Metadata = require('./metadata.js');
@@ -76,24 +87,48 @@ var constants = require('./constants');
var _ = require('lodash');
/**
+ * @external GoogleCredential
+ * @see https://github.com/google/google-auth-library-nodejs
+ */
+
+/**
* Create an SSL Credentials object. If using a client-side certificate, both
* the second and third arguments must be passed.
+ * @memberof grpc.credentials
+ * @alias grpc.credentials.createSsl
+ * @kind function
* @param {Buffer} root_certs The root certificate data
* @param {Buffer=} private_key The client certificate private key, if
* applicable
* @param {Buffer=} cert_chain The client certificate cert chain, if applicable
- * @return {ChannelCredentials} The SSL Credentials object
+ * @return {grpc.credentials.ChannelCredentials} The SSL Credentials object
*/
exports.createSsl = ChannelCredentials.createSsl;
/**
+ * @callback grpc.credentials~metadataCallback
+ * @param {Error} error The error, if getting metadata failed
+ * @param {grpc.Metadata} metadata The metadata
+ */
+
+/**
+ * @callback grpc.credentials~generateMetadata
+ * @param {Object} params Parameters that can modify metadata generation
+ * @param {string} params.service_url The URL of the service that the call is
+ * going to
+ * @param {grpc.credentials~metadataCallback} callback
+ */
+
+/**
* Create a gRPC credentials object from a metadata generation function. This
* function gets the service URL and a callback as parameters. The error
* passed to the callback can optionally have a 'code' value attached to it,
* which corresponds to a status code that this library uses.
- * @param {function(String, function(Error, Metadata))} metadata_generator The
- * function that generates metadata
- * @return {CallCredentials} The credentials object
+ * @memberof grpc.credentials
+ * @alias grpc.credentials.createFromMetadataGenerator
+ * @param {grpc.credentials~generateMetadata} metadata_generator The function
+ * that generates metadata
+ * @return {grpc.credentials.CallCredentials} The credentials object
*/
exports.createFromMetadataGenerator = function(metadata_generator) {
return CallCredentials.createFromPlugin(function(service_url, cb_data,
@@ -119,8 +154,11 @@ exports.createFromMetadataGenerator = function(metadata_generator) {
/**
* Create a gRPC credential from a Google credential object.
- * @param {Object} google_credential The Google credential object to use
- * @return {CallCredentials} The resulting credentials object
+ * @memberof grpc.credentials
+ * @alias grpc.credentials.createFromGoogleCredential
+ * @param {external:GoogleCredential} google_credential The Google credential
+ * object to use
+ * @return {grpc.credentials.CallCredentials} The resulting credentials object
*/
exports.createFromGoogleCredential = function(google_credential) {
return exports.createFromMetadataGenerator(function(auth_context, callback) {
@@ -141,6 +179,8 @@ exports.createFromGoogleCredential = function(google_credential) {
/**
* Combine a ChannelCredentials with any number of CallCredentials into a single
* ChannelCredentials object.
+ * @memberof grpc.credentials
+ * @alias grpc.credentials.combineChannelCredentials
* @param {ChannelCredentials} channel_credential The ChannelCredentials to
* start with
* @param {...CallCredentials} credentials The CallCredentials to compose
@@ -157,6 +197,8 @@ exports.combineChannelCredentials = function(channel_credential) {
/**
* Combine any number of CallCredentials into a single CallCredentials object
+ * @memberof grpc.credentials
+ * @alias grpc.credentials.combineCallCredentials
* @param {...CallCredentials} credentials the CallCredentials to compose
* @return CallCredentials A credentials object that combines all of the input
* credentials
@@ -172,6 +214,9 @@ exports.combineCallCredentials = function() {
/**
* Create an insecure credentials object. This is used to create a channel that
* does not use SSL. This cannot be composed with anything.
+ * @memberof grpc.credentials
+ * @alias grpc.credentials.createInsecure
+ * @kind function
* @return {ChannelCredentials} The insecure credentials object
*/
exports.createInsecure = ChannelCredentials.createInsecure;
diff --git a/src/node/src/grpc_extension.js b/src/node/src/grpc_extension.js
index 63a281ddbc..864da97314 100644
--- a/src/node/src/grpc_extension.js
+++ b/src/node/src/grpc_extension.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2016, Google Inc.
* All rights reserved.
*
diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js
index 92cf23998b..c757d520f8 100644
--- a/src/node/src/metadata.js
+++ b/src/node/src/metadata.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2015, Google Inc.
* All rights reserved.
*
@@ -31,15 +31,6 @@
*
*/
-/**
- * Metadata module
- *
- * This module defines the Metadata class, which represents header and trailer
- * metadata for gRPC calls.
- *
- * @module
- */
-
'use strict';
var _ = require('lodash');
@@ -48,8 +39,8 @@ var grpc = require('./grpc_extension');
/**
* Class for storing metadata. Keys are normalized to lowercase ASCII.
+ * @memberof grpc
* @constructor
- * @alias module:src/metadata.Metadata
* @example
* var metadata = new metadata_module.Metadata();
* metadata.set('key1', 'value1');
diff --git a/src/node/src/protobuf_js_5_common.js b/src/node/src/protobuf_js_5_common.js
index 4041e05390..1663a3a400 100644
--- a/src/node/src/protobuf_js_5_common.js
+++ b/src/node/src/protobuf_js_5_common.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2017, Google Inc.
* All rights reserved.
*
@@ -31,6 +31,11 @@
*
*/
+/**
+ * @module
+ * @private
+ */
+
'use strict';
var _ = require('lodash');
diff --git a/src/node/src/protobuf_js_6_common.js b/src/node/src/protobuf_js_6_common.js
index 00f11f2736..91a458aa20 100644
--- a/src/node/src/protobuf_js_6_common.js
+++ b/src/node/src/protobuf_js_6_common.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2017, Google Inc.
* All rights reserved.
*
@@ -31,6 +31,11 @@
*
*/
+/**
+ * @module
+ * @private
+ */
+
'use strict';
var _ = require('lodash');
diff --git a/src/node/src/server.js b/src/node/src/server.js
index 1d9cc7d2c1..ae4fcb1dc4 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -1,5 +1,5 @@
-/*
- *
+/**
+ * @license
* Copyright 2015, Google Inc.
* All rights reserved.
*
@@ -31,22 +31,6 @@
*
*/
-/**
- * Server module
- *
- * This module contains all the server code for Node gRPC: both the Server
- * class itself and the method handler code for all types of methods.
- *
- * For example, to create a Server, add a service, and start it:
- *
- * var server = new server_module.Server();
- * server.addProtoService(protobuf_service_descriptor, service_implementation);
- * server.bind('address:port', server_credential);
- * server.start();
- *
- * @module
- */
-
'use strict';
var _ = require('lodash');
@@ -70,9 +54,9 @@ var EventEmitter = require('events').EventEmitter;
/**
* Handle an error on a call by sending it as a status
- * @access private
- * @param {grpc.Call} call The call to send the error on
- * @param {Object} error The error object
+ * @private
+ * @param {grpc.internal~Call} call The call to send the error on
+ * @param {(Object|Error)} error The error object
*/
function handleError(call, error) {
var statusMetadata = new Metadata();
@@ -104,14 +88,14 @@ function handleError(call, error) {
/**
* Send a response to a unary or client streaming call.
- * @access private
+ * @private
* @param {grpc.Call} call The call to respond on
* @param {*} value The value to respond with
- * @param {function(*):Buffer=} serialize Serialization function for the
+ * @param {grpc~serialize} serialize Serialization function for the
* response
- * @param {Metadata=} metadata Optional trailing metadata to send with status
- * @param {number=} flags Flags for modifying how the message is sent.
- * Defaults to 0.
+ * @param {grpc.Metadata=} metadata Optional trailing metadata to send with
+ * status
+ * @param {number=} [flags=0] Flags for modifying how the message is sent.
*/
function sendUnaryResponse(call, value, serialize, metadata, flags) {
var end_batch = {};
@@ -146,7 +130,7 @@ function sendUnaryResponse(call, value, serialize, metadata, flags) {
/**
* Initialize a writable stream. This is used for both the writable and duplex
* stream constructors.
- * @access private
+ * @private
* @param {Writable} stream The stream to set up
* @param {function(*):Buffer=} Serialization function for responses
*/
@@ -225,9 +209,9 @@ function setUpWritable(stream, serialize) {
/**
* Initialize a readable stream. This is used for both the readable and duplex
* stream constructors.
- * @access private
+ * @private
* @param {Readable} stream The stream to initialize
- * @param {function(Buffer):*=} deserialize Deserialization function for
+ * @param {grpc~deserialize} deserialize Deserialization function for
* incoming data.
*/
function setUpReadable(stream, deserialize) {
@@ -245,34 +229,88 @@ function setUpReadable(stream, deserialize) {
});
}
+/**
+ * Emitted when the call has been cancelled. After this has been emitted, the
+ * call's `cancelled` property will be set to `true`.
+ * @event grpc~ServerUnaryCall~cancelled
+ */
+
util.inherits(ServerUnaryCall, EventEmitter);
-function ServerUnaryCall(call) {
+/**
+ * An EventEmitter. Used for unary calls.
+ * @constructor grpc~ServerUnaryCall
+ * @extends external:EventEmitter
+ * @param {grpc.internal~Call} call The call object associated with the request
+ * @param {grpc.Metadata} metadata The request metadata from the client
+ */
+function ServerUnaryCall(call, metadata) {
EventEmitter.call(this);
this.call = call;
+ /**
+ * Indicates if the call has been cancelled
+ * @member {boolean} grpc~ServerUnaryCall#cancelled
+ */
+ this.cancelled = false;
+ /**
+ * The request metadata from the client
+ * @member {grpc.Metadata} grpc~ServerUnaryCall#metadata
+ */
+ this.metadata = metadata;
+ /**
+ * The request message from the client
+ * @member {*} grpc~ServerUnaryCall#request
+ */
+ this.request = undefined;
}
+/**
+ * Emitted when the call has been cancelled. After this has been emitted, the
+ * call's `cancelled` property will be set to `true`.
+ * @event grpc~ServerWritableStream~cancelled
+ */
+
util.inherits(ServerWritableStream, Writable);
/**
* A stream that the server can write to. Used for calls that are streaming from
* the server side.
- * @constructor
- * @param {grpc.Call} call The call object to send data with
- * @param {function(*):Buffer=} serialize Serialization function for writes
+ * @constructor grpc~ServerWritableStream
+ * @extends external:Writable
+ * @borrows grpc~ServerUnaryCall#sendMetadata as
+ * grpc~ServerWritableStream#sendMetadata
+ * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerWritableStream#getPeer
+ * @param {grpc.internal~Call} call The call object to send data with
+ * @param {grpc.Metadata} metadata The request metadata from the client
+ * @param {grpc~serialize} serialize Serialization function for writes
*/
-function ServerWritableStream(call, serialize) {
+function ServerWritableStream(call, metadata, serialize) {
Writable.call(this, {objectMode: true});
this.call = call;
this.finished = false;
setUpWritable(this, serialize);
+ /**
+ * Indicates if the call has been cancelled
+ * @member {boolean} grpc~ServerWritableStream#cancelled
+ */
+ this.cancelled = false;
+ /**
+ * The request metadata from the client
+ * @member {grpc.Metadata} grpc~ServerWritableStream#metadata
+ */
+ this.metadata = metadata;
+ /**
+ * The request message from the client
+ * @member {*} grpc~ServerWritableStream#request
+ */
+ this.request = undefined;
}
/**
* Start writing a chunk of data. This is an implementation of a method required
* for implementing stream.Writable.
- * @access private
+ * @private
* @param {Buffer} chunk The chunk of data to write
* @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Callback to indicate that the write is
@@ -312,19 +350,40 @@ function _write(chunk, encoding, callback) {
ServerWritableStream.prototype._write = _write;
+/**
+ * Emitted when the call has been cancelled. After this has been emitted, the
+ * call's `cancelled` property will be set to `true`.
+ * @event grpc~ServerReadableStream~cancelled
+ */
+
util.inherits(ServerReadableStream, Readable);
/**
* A stream that the server can read from. Used for calls that are streaming
* from the client side.
- * @constructor
- * @param {grpc.Call} call The call object to read data with
- * @param {function(Buffer):*=} deserialize Deserialization function for reads
+ * @constructor grpc~ServerReadableStream
+ * @extends external:Readable
+ * @borrows grpc~ServerUnaryCall#sendMetadata as
+ * grpc~ServerReadableStream#sendMetadata
+ * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerReadableStream#getPeer
+ * @param {grpc.internal~Call} call The call object to read data with
+ * @param {grpc.Metadata} metadata The request metadata from the client
+ * @param {grpc~deserialize} deserialize Deserialization function for reads
*/
-function ServerReadableStream(call, deserialize) {
+function ServerReadableStream(call, metadata, deserialize) {
Readable.call(this, {objectMode: true});
this.call = call;
setUpReadable(this, deserialize);
+ /**
+ * Indicates if the call has been cancelled
+ * @member {boolean} grpc~ServerReadableStream#cancelled
+ */
+ this.cancelled = false;
+ /**
+ * The request metadata from the client
+ * @member {grpc.Metadata} grpc~ServerReadableStream#metadata
+ */
+ this.metadata = metadata;
}
/**
@@ -381,22 +440,43 @@ function _read(size) {
ServerReadableStream.prototype._read = _read;
+/**
+ * Emitted when the call has been cancelled. After this has been emitted, the
+ * call's `cancelled` property will be set to `true`.
+ * @event grpc~ServerDuplexStream~cancelled
+ */
+
util.inherits(ServerDuplexStream, Duplex);
/**
* A stream that the server can read from or write to. Used for calls with
* duplex streaming.
- * @constructor
- * @param {grpc.Call} call Call object to proxy
- * @param {function(*):Buffer=} serialize Serialization function for requests
- * @param {function(Buffer):*=} deserialize Deserialization function for
+ * @constructor grpc~ServerDuplexStream
+ * @extends external:Duplex
+ * @borrows grpc~ServerUnaryCall#sendMetadata as
+ * grpc~ServerDuplexStream#sendMetadata
+ * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerDuplexStream#getPeer
+ * @param {grpc.internal~Call} call Call object to proxy
+ * @param {grpc.Metadata} metadata The request metadata from the client
+ * @param {grpc~serialize} serialize Serialization function for requests
+ * @param {grpc~deserialize} deserialize Deserialization function for
* responses
*/
-function ServerDuplexStream(call, serialize, deserialize) {
+function ServerDuplexStream(call, metadata, serialize, deserialize) {
Duplex.call(this, {objectMode: true});
this.call = call;
setUpWritable(this, serialize);
setUpReadable(this, deserialize);
+ /**
+ * Indicates if the call has been cancelled
+ * @member {boolean} grpc~ServerReadableStream#cancelled
+ */
+ this.cancelled = false;
+ /**
+ * The request metadata from the client
+ * @member {grpc.Metadata} grpc~ServerReadableStream#metadata
+ */
+ this.metadata = metadata;
}
ServerDuplexStream.prototype._read = _read;
@@ -404,6 +484,7 @@ ServerDuplexStream.prototype._write = _write;
/**
* Send the initial metadata for a writable stream.
+ * @alias grpc~ServerUnaryCall#sendMetadata
* @param {Metadata} responseMetadata Metadata to send
*/
function sendMetadata(responseMetadata) {
@@ -430,6 +511,7 @@ ServerDuplexStream.prototype.sendMetadata = sendMetadata;
/**
* Get the endpoint this call/stream is connected to.
+ * @alias grpc~ServerUnaryCall#getPeer
* @return {string} The URI of the endpoint
*/
function getPeer() {
@@ -445,6 +527,7 @@ ServerDuplexStream.prototype.getPeer = getPeer;
/**
* Wait for the client to close, then emit a cancelled event if the client
* cancelled.
+ * @private
*/
function waitForCancel() {
/* jshint validthis: true */
@@ -468,18 +551,41 @@ ServerWritableStream.prototype.waitForCancel = waitForCancel;
ServerDuplexStream.prototype.waitForCancel = waitForCancel;
/**
+ * Callback function passed to server handlers that handle methods with unary
+ * responses.
+ * @callback grpc.Server~sendUnaryData
+ * @param {grpc~ServiceError} error An error, if the call failed
+ * @param {*} value The response value. Must be a valid argument to the
+ * `responseSerialize` method of the method that is being handled
+ * @param {grpc.Metadata=} trailer Trailing metadata to send, if applicable
+ * @param {grpc.writeFlags=} flags Flags to modify writing the response
+ */
+
+/**
+ * User-provided method to handle unary requests on a server
+ * @callback grpc.Server~handleUnaryCall
+ * @param {grpc~ServerUnaryCall} call The call object
+ * @param {grpc.Server~sendUnaryData} callback The callback to call to respond
+ * to the request
+ */
+
+/**
* Fully handle a unary call
- * @access private
- * @param {grpc.Call} call The call to handle
+ * @private
+ * @param {grpc.internal~Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
- * @param {Metadata} metadata Metadata from the client
+ * @param {grpc~Server.handleUnaryCall} handler.func The handler function
+ * @param {grpc~deserialize} handler.deserialize The deserialization function
+ * for request data
+ * @param {grpc~serialize} handler.serialize The serialization function for
+ * response data
+ * @param {grpc.Metadata} metadata Metadata from the client
*/
function handleUnary(call, handler, metadata) {
- var emitter = new ServerUnaryCall(call);
+ var emitter = new ServerUnaryCall(call, metadata);
emitter.on('error', function(error) {
handleError(call, error);
});
- emitter.metadata = metadata;
emitter.waitForCancel();
var batch = {};
batch[grpc.opType.RECV_MESSAGE] = true;
@@ -512,16 +618,27 @@ function handleUnary(call, handler, metadata) {
}
/**
+ * User provided method to handle server streaming methods on the server.
+ * @callback grpc.Server~handleServerStreamingCall
+ * @param {grpc~ServerWritableStream} call The call object
+ */
+
+/**
* Fully handle a server streaming call
- * @access private
- * @param {grpc.Call} call The call to handle
+ * @private
+ * @param {grpc.internal~Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
- * @param {Metadata} metadata Metadata from the client
+ * @param {grpc~Server.handleServerStreamingCall} handler.func The handler
+ * function
+ * @param {grpc~deserialize} handler.deserialize The deserialization function
+ * for request data
+ * @param {grpc~serialize} handler.serialize The serialization function for
+ * response data
+ * @param {grpc.Metadata} metadata Metadata from the client
*/
function handleServerStreaming(call, handler, metadata) {
- var stream = new ServerWritableStream(call, handler.serialize);
+ var stream = new ServerWritableStream(call, metadata, handler.serialize);
stream.waitForCancel();
- stream.metadata = metadata;
var batch = {};
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
@@ -541,19 +658,32 @@ function handleServerStreaming(call, handler, metadata) {
}
/**
+ * User provided method to handle client streaming methods on the server.
+ * @callback grpc.Server~handleClientStreamingCall
+ * @param {grpc~ServerReadableStream} call The call object
+ * @param {grpc.Server~sendUnaryData} callback The callback to call to respond
+ * to the request
+ */
+
+/**
* Fully handle a client streaming call
* @access private
- * @param {grpc.Call} call The call to handle
+ * @param {grpc.internal~Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
- * @param {Metadata} metadata Metadata from the client
+ * @param {grpc~Server.handleClientStreamingCall} handler.func The handler
+ * function
+ * @param {grpc~deserialize} handler.deserialize The deserialization function
+ * for request data
+ * @param {grpc~serialize} handler.serialize The serialization function for
+ * response data
+ * @param {grpc.Metadata} metadata Metadata from the client
*/
function handleClientStreaming(call, handler, metadata) {
- var stream = new ServerReadableStream(call, handler.deserialize);
+ var stream = new ServerReadableStream(call, metadata, handler.deserialize);
stream.on('error', function(error) {
handleError(call, error);
});
stream.waitForCancel();
- stream.metadata = metadata;
handler.func(stream, function(err, value, trailer, flags) {
stream.terminate();
if (err) {
@@ -568,17 +698,28 @@ function handleClientStreaming(call, handler, metadata) {
}
/**
+ * User provided method to handle bidirectional streaming calls on the server.
+ * @callback grpc.Server~handleBidiStreamingCall
+ * @param {grpc~ServerDuplexStream} call The call object
+ */
+
+/**
* Fully handle a bidirectional streaming call
- * @access private
- * @param {grpc.Call} call The call to handle
+ * @private
+ * @param {grpc.internal~Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
+ * @param {grpc~Server.handleBidiStreamingCall} handler.func The handler
+ * function
+ * @param {grpc~deserialize} handler.deserialize The deserialization function
+ * for request data
+ * @param {grpc~serialize} handler.serialize The serialization function for
+ * response data
* @param {Metadata} metadata Metadata from the client
*/
function handleBidiStreaming(call, handler, metadata) {
- var stream = new ServerDuplexStream(call, handler.serialize,
+ var stream = new ServerDuplexStream(call, metadata, handler.serialize,
handler.deserialize);
stream.waitForCancel();
- stream.metadata = metadata;
handler.func(stream);
}
@@ -592,96 +733,90 @@ var streamHandlers = {
/**
* Constructs a server object that stores request handlers and delegates
* incoming requests to those handlers
+ * @memberof grpc
* @constructor
* @param {Object=} options Options that should be passed to the internal server
* implementation
+ * @example
+ * var server = new grpc.Server();
+ * server.addProtoService(protobuf_service_descriptor, service_implementation);
+ * server.bind('address:port', server_credential);
+ * server.start();
*/
function Server(options) {
this.handlers = {};
- var handlers = this.handlers;
var server = new grpc.Server(options);
this._server = server;
this.started = false;
+}
+
+/**
+ * Start the server and begin handling requests
+ */
+Server.prototype.start = function() {
+ if (this.started) {
+ throw new Error('Server is already running');
+ }
+ var self = this;
+ this.started = true;
+ this._server.start();
/**
- * Start the server and begin handling requests
- * @this Server
+ * Handles the SERVER_RPC_NEW event. If there is a handler associated with
+ * the requested method, use that handler to respond to the request. Then
+ * wait for the next request
+ * @param {grpc.internal~Event} event The event to handle with tag
+ * SERVER_RPC_NEW
*/
- this.start = function() {
- if (this.started) {
- throw new Error('Server is already running');
+ function handleNewCall(err, event) {
+ if (err) {
+ return;
}
- this.started = true;
- server.start();
- /**
- * Handles the SERVER_RPC_NEW event. If there is a handler associated with
- * the requested method, use that handler to respond to the request. Then
- * wait for the next request
- * @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW
- */
- function handleNewCall(err, event) {
- if (err) {
- return;
- }
- var details = event.new_call;
- var call = details.call;
- var method = details.method;
- var metadata = Metadata._fromCoreRepresentation(details.metadata);
- if (method === null) {
- return;
- }
- server.requestCall(handleNewCall);
- var handler;
- if (handlers.hasOwnProperty(method)) {
- handler = handlers[method];
- } else {
- var batch = {};
- batch[grpc.opType.SEND_INITIAL_METADATA] =
- (new Metadata())._getCoreRepresentation();
- batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
- code: constants.status.UNIMPLEMENTED,
- details: '',
- metadata: {}
- };
- batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
- call.startBatch(batch, function() {});
- return;
- }
- streamHandlers[handler.type](call, handler, metadata);
+ var details = event.new_call;
+ var call = details.call;
+ var method = details.method;
+ var metadata = Metadata._fromCoreRepresentation(details.metadata);
+ if (method === null) {
+ return;
}
- server.requestCall(handleNewCall);
- };
-
- /**
- * Gracefully shuts down the server. The server will stop receiving new calls,
- * and any pending calls will complete. The callback will be called when all
- * pending calls have completed and the server is fully shut down. This method
- * is idempotent with itself and forceShutdown.
- * @param {function()} callback The shutdown complete callback
- */
- this.tryShutdown = function(callback) {
- server.tryShutdown(callback);
- };
+ self._server.requestCall(handleNewCall);
+ var handler;
+ if (self.handlers.hasOwnProperty(method)) {
+ handler = self.handlers[method];
+ } else {
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] =
+ (new Metadata())._getCoreRepresentation();
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: constants.status.UNIMPLEMENTED,
+ details: '',
+ metadata: {}
+ };
+ batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ call.startBatch(batch, function() {});
+ return;
+ }
+ streamHandlers[handler.type](call, handler, metadata);
+ }
+ this._server.requestCall(handleNewCall);
+};
- /**
- * Forcibly shuts down the server. The server will stop receiving new calls
- * and cancel all pending calls. When it returns, the server has shut down.
- * This method is idempotent with itself and tryShutdown, and it will trigger
- * any outstanding tryShutdown callbacks.
- */
- this.forceShutdown = function() {
- server.forceShutdown();
- };
-}
+/**
+ * Unified type for application handlers for all types of calls
+ * @typedef {(grpc.Server~handleUnaryCall
+ * |grpc.Server~handleClientStreamingCall
+ * |grpc.Server~handleServerStreamingCall
+ * |grpc.Server~handleBidiStreamingCall)} grpc.Server~handleCall
+ */
/**
* Registers a handler to handle the named method. Fails if there already is
* a handler for the given method. Returns true on success
* @param {string} name The name of the method that the provided function should
* handle/respond to.
- * @param {function} handler Function that takes a stream of request values and
- * returns a stream of response values
- * @param {function(*):Buffer} serialize Serialization function for responses
- * @param {function(Buffer):*} deserialize Deserialization function for requests
+ * @param {grpc.Server~handleCall} handler Function that takes a stream of
+ * request values and returns a stream of response values
+ * @param {grpc~serialize} serialize Serialization function for responses
+ * @param {grpc~deserialize} deserialize Deserialization function for requests
* @param {string} type The streaming type of method that this handles
* @return {boolean} True if the handler was set. False if a handler was already
* set for that name.
@@ -700,6 +835,27 @@ Server.prototype.register = function(name, handler, serialize, deserialize,
return true;
};
+/**
+ * Gracefully shuts down the server. The server will stop receiving new calls,
+ * and any pending calls will complete. The callback will be called when all
+ * pending calls have completed and the server is fully shut down. This method
+ * is idempotent with itself and forceShutdown.
+ * @param {function()} callback The shutdown complete callback
+ */
+Server.prototype.tryShutdown = function(callback) {
+ this._server.tryShutdown(callback);
+};
+
+/**
+ * Forcibly shuts down the server. The server will stop receiving new calls
+ * and cancel all pending calls. When it returns, the server has shut down.
+ * This method is idempotent with itself and tryShutdown, and it will trigger
+ * any outstanding tryShutdown callbacks.
+ */
+Server.prototype.forceShutdown = function() {
+ this._server.forceShutdown();
+};
+
var unimplementedStatusResponse = {
code: constants.status.UNIMPLEMENTED,
details: 'The server does not implement this method'
@@ -721,13 +877,10 @@ var defaultHandler = {
};
/**
- * Add a service to the server, with a corresponding implementation. If you are
- * generating this from a proto file, you should instead use
- * addProtoService.
- * @param {Object<String, *>} service The service descriptor, as
- * {@link module:src/common.getProtobufServiceAttrs} returns
- * @param {Object<String, function>} implementation Map of method names to
- * method implementation for the provided service.
+ * Add a service to the server, with a corresponding implementation.
+ * @param {grpc~ServiceDefinition} service The service descriptor
+ * @param {Object<String, grpc.Server~handleCall>} implementation Map of method
+ * names to method implementation for the provided service.
*/
Server.prototype.addService = function(service, implementation) {
if (!_.isObject(service) || !_.isObject(implementation)) {
@@ -788,10 +941,10 @@ var logAddProtoServiceDeprecationOnce = _.once(function() {
/**
* Add a proto service to the server, with a corresponding implementation
- * @deprecated Use grpc.load and Server#addService instead
+ * @deprecated Use {@link grpc.Server#addService} instead
* @param {Protobuf.Reflect.Service} service The proto service descriptor
- * @param {Object<String, function>} implementation Map of method names to
- * method implementation for the provided service.
+ * @param {Object<String, grpc.Server~handleCall>} implementation Map of method
+ * names to method implementation for the provided service.
*/
Server.prototype.addProtoService = function(service, implementation) {
var options;
@@ -815,10 +968,11 @@ Server.prototype.addProtoService = function(service, implementation) {
};
/**
- * Binds the server to the given port, with SSL enabled if creds is given
+ * Binds the server to the given port, with SSL disabled if creds is an
+ * insecure credentials object
* @param {string} port The port that the server should bind on, in the format
* "address:port"
- * @param {ServerCredentials=} creds Server credential object to be used for
+ * @param {grpc.ServerCredentials} creds Server credential object to be used for
* SSL. Pass an insecure credentials object for an insecure port.
*/
Server.prototype.bind = function(port, creds) {
@@ -828,7 +982,4 @@ Server.prototype.bind = function(port, creds) {
return this._server.addHttp2Port(port, creds);
};
-/**
- * @see module:src/server~Server
- */
exports.Server = Server;
diff --git a/src/node/test/common_test.js b/src/node/test/common_test.js
index b7c2c6a8d6..db80207e22 100644
--- a/src/node/test/common_test.js
+++ b/src/node/test/common_test.js
@@ -100,7 +100,6 @@ describe('Proto message long int serialize and deserialize', function() {
var longNumDeserialize = deserializeCls(messages_proto.LongValues,
num_options);
var serialized = longSerialize({int_64: pos_value});
- console.log(longDeserialize(serialized));
assert.strictEqual(typeof longDeserialize(serialized).int_64, 'string');
/* With the longsAsStrings option disabled, long values are represented as
* objects with 3 keys: low, high, and unsigned */
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index d2f0511af2..f8eaf62aaf 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -1110,6 +1110,18 @@ describe('Other conditions', function() {
done();
});
});
+ it('after the call has fully completed', function(done) {
+ var peer;
+ var call = client.unary({error: false}, function(err, data) {
+ assert.ifError(err);
+ setImmediate(function() {
+ assert.strictEqual(peer, call.getPeer());
+ done();
+ });
+ });
+ peer = call.getPeer();
+ assert.strictEqual(typeof peer, 'string');
+ });
});
});
describe('Call propagation', function() {
@@ -1322,14 +1334,14 @@ describe('Cancelling surface client', function() {
});
it('Should correctly cancel a unary call', function(done) {
var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
- assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ assert.strictEqual(err.code, grpc.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a client stream call', function(done) {
var call = client.sum(function(err, resp) {
- assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ assert.strictEqual(err.code, grpc.status.CANCELLED);
done();
});
call.cancel();
@@ -1338,7 +1350,7 @@ describe('Cancelling surface client', function() {
var call = client.fib({'limit': 5});
call.on('data', function() {});
call.on('error', function(error) {
- assert.strictEqual(error.code, surface_client.status.CANCELLED);
+ assert.strictEqual(error.code, grpc.status.CANCELLED);
done();
});
call.cancel();
@@ -1347,9 +1359,22 @@ describe('Cancelling surface client', function() {
var call = client.divMany();
call.on('data', function() {});
call.on('error', function(error) {
- assert.strictEqual(error.code, surface_client.status.CANCELLED);
+ assert.strictEqual(error.code, grpc.status.CANCELLED);
done();
});
call.cancel();
});
+ it('Should be idempotent', function(done) {
+ var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
+ assert.strictEqual(err.code, grpc.status.CANCELLED);
+ // Call asynchronously to try cancelling after call is fully completed
+ setImmediate(function() {
+ assert.doesNotThrow(function() {
+ call.cancel();
+ });
+ done();
+ });
+ });
+ call.cancel();
+ });
});
diff --git a/src/node/tools/package.json b/src/node/tools/package.json
index a81aa87f4b..542d52d48b 100644
--- a/src/node/tools/package.json
+++ b/src/node/tools/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc-tools",
- "version": "1.4.0-dev",
+ "version": "1.5.0-dev",
"author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js",
"homepage": "http://www.grpc.io/",
diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
index 2f29058b59..711814e7fa 100644
--- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
+++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
@@ -42,7 +42,7 @@ Pod::Spec.new do |s|
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler-gRPCPlugin'
- v = '1.4.0-dev'
+ v = '1.5.0-dev'
s.version = v
s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.'
s.description = <<-DESC
diff --git a/src/objective-c/GRPCClient/private/version.h b/src/objective-c/GRPCClient/private/version.h
index c846f4214c..cacbce4644 100644
--- a/src/objective-c/GRPCClient/private/version.h
+++ b/src/objective-c/GRPCClient/private/version.h
@@ -38,4 +38,4 @@
// `tools/buildgen/generate_projects.sh`.
-#define GRPC_OBJC_VERSION_STRING @"1.4.0-dev"
+#define GRPC_OBJC_VERSION_STRING @"1.5.0-dev"
diff --git a/src/php/composer.json b/src/php/composer.json
index a4fba7e4f6..3a97e5fb41 100644
--- a/src/php/composer.json
+++ b/src/php/composer.json
@@ -2,7 +2,7 @@
"name": "grpc/grpc-dev",
"description": "gRPC library for PHP - for Developement use only",
"license": "BSD-3-Clause",
- "version": "1.4.0",
+ "version": "1.5.0",
"require": {
"php": ">=5.5.0",
"google/protobuf": "^v3.3.0"
diff --git a/src/php/ext/grpc/version.h b/src/php/ext/grpc/version.h
index 993ef2de27..303a63ec36 100644
--- a/src/php/ext/grpc/version.h
+++ b/src/php/ext/grpc/version.h
@@ -35,6 +35,6 @@
#ifndef VERSION_H
#define VERSION_H
-#define PHP_GRPC_VERSION "1.4.0"
+#define PHP_GRPC_VERSION "1.5.0"
#endif /* VERSION_H */
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 4316449ac6..012ed8ec81 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -786,7 +786,7 @@ def _channel_managed_call_management(state):
class _ChannelConnectivityState(object):
def __init__(self, channel):
- self.lock = threading.Lock()
+ self.lock = threading.RLock()
self.channel = channel
self.polling = False
self.connectivity = None
@@ -926,6 +926,11 @@ class Channel(grpc.Channel):
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
+ # TODO(https://github.com/grpc/grpc/issues/9884)
+ # Temporary work around UNAVAILABLE issues
+ # Remove this once c-core has retry support
+ _subscribe(self._connectivity_state, lambda *args: None, None)
+
def subscribe(self, callback, try_to_connect=None):
_subscribe(self._connectivity_state, callback, try_to_connect)
diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py
index a0cb0dd067..3ae2602d20 100644
--- a/src/python/grpcio/grpc/_grpcio_metadata.py
+++ b/src/python/grpcio/grpc/_grpcio_metadata.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!!
-__version__ = """1.4.0.dev0"""
+__version__ = """1.5.0.dev0"""
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 38673fcb39..f6578dbd11 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -314,6 +314,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/census/grpc_filter.c',
'src/core/ext/census/grpc_plugin.c',
'src/core/ext/census/initialize.c',
+ 'src/core/ext/census/intrusive_hash_map.c',
'src/core/ext/census/mlog.c',
'src/core/ext/census/operation.c',
'src/core/ext/census/placeholders.c',
diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py
index ea4bc7ba20..f5bd29ff85 100644
--- a/src/python/grpcio/grpc_version.py
+++ b/src/python/grpcio/grpc_version.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
-VERSION='1.4.0.dev0'
+VERSION='1.5.0.dev0'
diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py
index 26aa555e14..26a8301883 100644
--- a/src/python/grpcio_health_checking/grpc_version.py
+++ b/src/python/grpcio_health_checking/grpc_version.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!!
-VERSION='1.4.0.dev0'
+VERSION='1.5.0.dev0'
diff --git a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py
index cd896f32c3..1a7d3259df 100644
--- a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py
+++ b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py
@@ -28,8 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Reference implementation for reflection in gRPC Python."""
-import threading
-
import grpc
from google.protobuf import descriptor_pb2
from google.protobuf import descriptor_pool
@@ -120,6 +118,7 @@ class ReflectionServicer(reflection_pb2_grpc.ServerReflectionServicer):
]))
def ServerReflectionInfo(self, request_iterator, context):
+ # pylint: disable=unused-argument
for request in request_iterator:
if request.HasField('file_by_filename'):
yield self._file_by_filename(request.file_by_filename)
@@ -152,4 +151,4 @@ def enable_server_reflection(service_names, server, pool=None):
pool: DescriptorPool object to use (descriptor_pool.Default() if None).
"""
reflection_pb2_grpc.add_ServerReflectionServicer_to_server(
- ReflectionServicer(service_names), server, pool)
+ ReflectionServicer(service_names, pool=pool), server)
diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py
index 978d6b4011..f16737df80 100644
--- a/src/python/grpcio_reflection/grpc_version.py
+++ b/src/python/grpcio_reflection/grpc_version.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!!
-VERSION='1.4.0.dev0'
+VERSION='1.5.0.dev0'
diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py
index 5f0b084884..28cf8a8a62 100644
--- a/src/python/grpcio_tests/grpc_version.py
+++ b/src/python/grpcio_tests/grpc_version.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!!
-VERSION='1.4.0.dev0'
+VERSION='1.5.0.dev0'
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index 9f7587faa6..126e8ac60d 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -32,6 +32,7 @@
"unit._invocation_defects_test.InvocationDefectsTest",
"unit._metadata_code_details_test.MetadataCodeDetailsTest",
"unit._metadata_test.MetadataTest",
+ "unit._reconnect_test.ReconnectTest",
"unit._resource_exhausted_test.ResourceExhaustedTest",
"unit._rpc_test.RPCTest",
"unit._sanity._sanity_test.Sanity",
diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
new file mode 100644
index 0000000000..6c316476b3
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
@@ -0,0 +1,70 @@
+# Copyright 2017, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""Tests that a channel will reconnect if a connection is dropped"""
+
+import unittest
+
+import grpc
+from grpc.framework.foundation import logging_pool
+
+from tests.unit.framework.common import test_constants
+
+_REQUEST = b'\x00\x00\x00'
+_RESPONSE = b'\x00\x00\x01'
+
+_UNARY_UNARY = '/test/UnaryUnary'
+
+
+def _handle_unary_unary(unused_request, unused_servicer_context):
+ return _RESPONSE
+
+
+class ReconnectTest(unittest.TestCase):
+
+ def test_reconnect(self):
+ server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ handler = grpc.method_handlers_generic_handler('test', {
+ 'UnaryUnary':
+ grpc.unary_unary_rpc_method_handler(_handle_unary_unary)
+ })
+ server = grpc.server(server_pool, (handler,))
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ channel = grpc.insecure_channel('localhost:%d' % port)
+ multi_callable = channel.unary_unary(_UNARY_UNARY)
+ self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
+ server.stop(None)
+ server = grpc.server(server_pool, (handler,))
+ server.add_insecure_port('[::]:{}'.format(port))
+ server.start()
+ self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb
index d3e5373b0b..bed8c43405 100755
--- a/src/ruby/end2end/channel_closing_driver.rb
+++ b/src/ruby/end2end/channel_closing_driver.rb
@@ -61,6 +61,11 @@ def main
'channel is closed while connectivity is watched'
end
+ client_exit_code = $CHILD_STATUS
+ if client_exit_code != 0
+ fail "channel closing client failed, exit code #{client_exit_code}"
+ end
+
server_runner.stop
end
diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb
index 80fb62899e..9910076dba 100755
--- a/src/ruby/end2end/channel_state_driver.rb
+++ b/src/ruby/end2end/channel_state_driver.rb
@@ -58,6 +58,9 @@ def main
'It likely hangs when ended abruptly'
end
+ # The interrupt in the child process should cause it to
+ # exit a non-zero status, so don't check it here.
+ # This test mainly tries to catch deadlock.
server_runner.stop
end
diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb
index ee79292119..e73ca76850 100755
--- a/src/ruby/end2end/grpc_class_init_client.rb
+++ b/src/ruby/end2end/grpc_class_init_client.rb
@@ -34,44 +34,110 @@
require_relative './end2end_common'
-def main
- grpc_class = ''
- OptionParser.new do |opts|
- opts.on('--grpc_class=P', String) do |p|
- grpc_class = p
+def construct_many(test_proc)
+ thds = []
+ 4.times do
+ thds << Thread.new do
+ 20.times do
+ test_proc.call
+ end
end
- end.parse!
+ end
+ 20.times do
+ test_proc.call
+ end
+ thds.each(&:join)
+end
+
+def run_gc_stress_test(test_proc)
+ GC.disable
+ construct_many(test_proc)
- test_proc = nil
+ GC.enable
+ construct_many(test_proc)
+
+ GC.start(full_mark: true, immediate_sweep: true)
+ construct_many(test_proc)
+end
+
+def run_concurrency_stress_test(test_proc)
+ 100.times do
+ Thread.new do
+ test_proc.call
+ end
+ end
+
+ test_proc.call
+
+ fail 'exception thrown while child thread initing class'
+end
+# default (no gc_stress and no concurrency_stress)
+def run_default_test(test_proc)
+ thd = Thread.new do
+ test_proc.call
+ end
+ test_proc.call
+ thd.join
+end
+
+def get_test_proc(grpc_class)
case grpc_class
when 'channel'
- test_proc = proc do
+ return proc do
GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure)
end
when 'server'
- test_proc = proc do
+ return proc do
GRPC::Core::Server.new({})
end
when 'channel_credentials'
- test_proc = proc do
+ return proc do
GRPC::Core::ChannelCredentials.new
end
when 'call_credentials'
- test_proc = proc do
+ return proc do
GRPC::Core::CallCredentials.new(proc { |noop| noop })
end
when 'compression_options'
- test_proc = proc do
+ return proc do
GRPC::Core::CompressionOptions.new
end
else
fail "bad --grpc_class=#{grpc_class} param"
end
+end
- th = Thread.new { test_proc.call }
- test_proc.call
- th.join
+def main
+ grpc_class = ''
+ stress_test = ''
+ OptionParser.new do |opts|
+ opts.on('--grpc_class=P', String) do |p|
+ grpc_class = p
+ end
+ opts.on('--stress_test=P') do |p|
+ stress_test = p
+ end
+ end.parse!
+
+ test_proc = get_test_proc(grpc_class)
+
+ # the different test configs need to be ran
+ # in separate processes, since each one tests
+ # clean shutdown in a different way
+ case stress_test
+ when 'gc'
+ p 'run gc stress'
+ run_gc_stress_test(test_proc)
+ when 'concurrency'
+ p 'run concurrency stress'
+ run_concurrency_stress_test(test_proc)
+ when ''
+ p 'run default'
+ run_default_test(test_proc)
+ else
+ fail "bad --stress_test=#{stress_test} param"
+ end
end
main
diff --git a/src/ruby/end2end/grpc_class_init_driver.rb b/src/ruby/end2end/grpc_class_init_driver.rb
index 764d029f14..c65ed547c5 100755
--- a/src/ruby/end2end/grpc_class_init_driver.rb
+++ b/src/ruby/end2end/grpc_class_init_driver.rb
@@ -38,29 +38,40 @@ def main
call_credentials
compression_options )
- native_grpc_classes.each do |grpc_class|
- STDERR.puts 'start client'
- this_dir = File.expand_path(File.dirname(__FILE__))
- client_path = File.join(this_dir, 'grpc_class_init_client.rb')
- client_pid = Process.spawn(RbConfig.ruby,
- client_path,
- "--grpc_class=#{grpc_class}")
- begin
- Timeout.timeout(10) do
- Process.wait(client_pid)
+ # there is room for false positives in this test,
+ # do a few runs for each config
+ 4.times do
+ native_grpc_classes.each do |grpc_class|
+ ['', 'gc', 'concurrency'].each do |stress_test_type|
+ STDERR.puts 'start client'
+ this_dir = File.expand_path(File.dirname(__FILE__))
+ client_path = File.join(this_dir, 'grpc_class_init_client.rb')
+ client_pid = Process.spawn(RbConfig.ruby,
+ client_path,
+ "--grpc_class=#{grpc_class}",
+ "--stress_test=#{stress_test_type}")
+ begin
+ Timeout.timeout(10) do
+ Process.wait(client_pid)
+ end
+ rescue Timeout::Error
+ STDERR.puts "timeout waiting for client pid #{client_pid}"
+ Process.kill('SIGKILL', client_pid)
+ Process.wait(client_pid)
+ STDERR.puts 'killed client child'
+ raise 'Timed out waiting for client process. ' \
+ 'It likely hangs when the first constructed gRPC object has ' \
+ "type: #{grpc_class}"
+ end
+
+ client_exit_code = $CHILD_STATUS
+ # concurrency stress test type is expected to exit with a
+ # non-zero status due to an exception being raised
+ if client_exit_code != 0 && stress_test_type != 'concurrency'
+ fail "client failed, exit code #{client_exit_code}"
+ end
end
- rescue Timeout::Error
- STDERR.puts "timeout waiting for client pid #{client_pid}"
- Process.kill('SIGKILL', client_pid)
- Process.wait(client_pid)
- STDERR.puts 'killed client child'
- raise 'Timed out waiting for client process. ' \
- 'It likely hangs when the first constructed gRPC object has ' \
- "type: #{grpc_class}"
end
-
- client_exit_code = $CHILD_STATUS
- fail "client failed, exit code #{client_exit_code}" if client_exit_code != 0
end
end
diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb
new file mode 100755
index 0000000000..206ec8e801
--- /dev/null
+++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb
@@ -0,0 +1,63 @@
+#!/usr/bin/env ruby
+
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require_relative './end2end_common'
+
+Thread.abort_on_exception = true
+
+include GRPC::Core::ConnectivityStates
+
+def watch_state(ch)
+ thd = Thread.new do
+ state = ch.connectivity_state(false)
+ fail "non-idle state: #{state}" unless state == IDLE
+ ch.watch_connectivity_state(IDLE, Time.now + 360)
+ end
+ sleep 0.1
+ thd.kill
+end
+
+def main
+ channels = []
+ 10.times do
+ ch = GRPC::Core::Channel.new('dummy_host',
+ nil, :this_channel_is_insecure)
+ watch_state(ch)
+ channels << ch
+ end
+
+ # checking state should still be safe to call
+ channels.each do |c|
+ fail unless c.connectivity_state(false) == FATAL_FAILURE
+ end
+end
+
+main
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_client.rb b/src/ruby/end2end/sig_int_during_channel_watch_client.rb
index 389fc5ba33..0c6a374925 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_client.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_client.rb
@@ -46,6 +46,8 @@ def main
end
end.parse!
+ trap('SIGINT') { exit 0 }
+
thd = Thread.new do
child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}",
{},
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
index 670cda0919..79a8c133fa 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
@@ -63,6 +63,11 @@ def main
'SIGINT is sent while there is an active connectivity_state call'
end
+ client_exit_code = $CHILD_STATUS
+ if client_exit_code != 0
+ fail "sig_int_during_channel_watch_client failed: #{client_exit_code}"
+ end
+
server_runner.stop
end
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index a802183726..f65388448a 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -68,29 +68,53 @@ static VALUE grpc_rb_cChannel = Qnil;
/* Used during the conversion of a hash to channel args during channel setup */
static VALUE grpc_rb_cChannelArgs;
+typedef struct bg_watched_channel {
+ grpc_channel *channel;
+ // these fields must only be accessed under global_connection_polling_mu
+ struct bg_watched_channel *next;
+ int channel_destroyed;
+ int refcount;
+} bg_watched_channel;
+
/* grpc_rb_channel wraps a grpc_channel. */
typedef struct grpc_rb_channel {
VALUE credentials;
- /* The actual channel */
- grpc_channel *wrapped;
- int request_safe_destroy;
- int safe_to_destroy;
- grpc_connectivity_state current_connectivity_state;
-
- int mu_init_done;
- int abort_watch_connectivity_state;
- gpr_mu channel_mu;
- gpr_cv channel_cv;
+ /* The actual channel (protected in a wrapper to tell when it's safe to
+ * destroy) */
+ bg_watched_channel *bg_wrapped;
} grpc_rb_channel;
-/* Forward declarations of functions involved in temporary fix to
- * https://github.com/grpc/grpc/issues/9941 */
+typedef enum { CONTINUOUS_WATCH, WATCH_STATE_API } watch_state_op_type;
+
+typedef struct watch_state_op {
+ watch_state_op_type op_type;
+ // from event.success
+ union {
+ struct {
+ int success;
+ // has been called back due to a cq next call
+ int called_back;
+ } api_callback_args;
+ struct {
+ bg_watched_channel *bg;
+ } continuous_watch_callback_args;
+ } op;
+} watch_state_op;
+
+static bg_watched_channel *bg_watched_channel_list_head = NULL;
+
static void grpc_rb_channel_try_register_connection_polling(
- grpc_rb_channel *wrapper);
-static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
+ bg_watched_channel *bg);
static void *wait_until_channel_polling_thread_started_no_gil(void *);
static void wait_until_channel_polling_thread_started_unblocking_func(void *);
+static void *channel_init_try_register_connection_polling_without_gil(
+ void *arg);
+
+typedef struct channel_init_try_register_stack {
+ grpc_channel *channel;
+ grpc_rb_channel *wrapper;
+} channel_init_try_register_stack;
static grpc_completion_queue *channel_polling_cq;
static gpr_mu global_connection_polling_mu;
@@ -98,6 +122,42 @@ static gpr_cv global_connection_polling_cv;
static int abort_channel_polling = 0;
static int channel_polling_thread_started = 0;
+static int bg_watched_channel_list_lookup(bg_watched_channel *bg);
+static bg_watched_channel *bg_watched_channel_list_create_and_add(
+ grpc_channel *channel);
+static void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg);
+static void run_poll_channels_loop_unblocking_func(void *arg);
+
+// Needs to be called under global_connection_polling_mu
+static void grpc_rb_channel_watch_connection_state_op_complete(
+ watch_state_op *op, int success) {
+ GPR_ASSERT(!op->op.api_callback_args.called_back);
+ op->op.api_callback_args.called_back = 1;
+ op->op.api_callback_args.success = success;
+ // wake up the watch API call thats waiting on this op
+ gpr_cv_broadcast(&global_connection_polling_cv);
+}
+
+/* Avoids destroying a channel twice. */
+static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) {
+ gpr_mu_lock(&global_connection_polling_mu);
+ GPR_ASSERT(bg_watched_channel_list_lookup(bg));
+ if (!bg->channel_destroyed) {
+ grpc_channel_destroy(bg->channel);
+ bg->channel_destroyed = 1;
+ }
+ bg->refcount--;
+ if (bg->refcount == 0) {
+ bg_watched_channel_list_free_and_remove(bg);
+ }
+ gpr_mu_unlock(&global_connection_polling_mu);
+}
+
+static void *channel_safe_destroy_without_gil(void *arg) {
+ grpc_rb_channel_safe_destroy((bg_watched_channel *)arg);
+ return NULL;
+}
+
/* Destroys Channel instances. */
static void grpc_rb_channel_free(void *p) {
grpc_rb_channel *ch = NULL;
@@ -106,14 +166,13 @@ static void grpc_rb_channel_free(void *p) {
};
ch = (grpc_rb_channel *)p;
- if (ch->wrapped != NULL) {
- grpc_rb_channel_safe_destroy(ch);
- ch->wrapped = NULL;
- }
-
- if (ch->mu_init_done) {
- gpr_mu_destroy(&ch->channel_mu);
- gpr_cv_destroy(&ch->channel_cv);
+ if (ch->bg_wrapped != NULL) {
+ /* assumption made here: it's ok to directly gpr_mu_lock the global
+ * connection polling mutex becuse we're in a finalizer,
+ * and we can count on this thread to not be interrupted or
+ * yield the gil. */
+ grpc_rb_channel_safe_destroy(ch->bg_wrapped);
+ ch->bg_wrapped = NULL;
}
xfree(p);
@@ -146,7 +205,7 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
/* Allocates grpc_rb_channel instances. */
static VALUE grpc_rb_channel_alloc(VALUE cls) {
grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel);
- wrapper->wrapped = NULL;
+ wrapper->bg_wrapped = NULL;
wrapper->credentials = Qnil;
return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper);
}
@@ -168,18 +227,21 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
grpc_channel_credentials *creds = NULL;
char *target_chars = NULL;
grpc_channel_args args;
+ channel_init_try_register_stack stack;
+ int stop_waiting_for_thread_start = 0;
MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
rb_thread_call_without_gvl(
- wait_until_channel_polling_thread_started_no_gil, NULL,
- wait_until_channel_polling_thread_started_unblocking_func, NULL);
+ wait_until_channel_polling_thread_started_no_gil,
+ &stop_waiting_for_thread_start,
+ wait_until_channel_polling_thread_started_unblocking_func,
+ &stop_waiting_for_thread_start);
/* "3" == 3 mandatory args */
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- wrapper->mu_init_done = 0;
target_chars = StringValueCStr(target);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
if (TYPE(credentials) == T_SYMBOL) {
@@ -196,24 +258,11 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
}
GPR_ASSERT(ch);
-
- wrapper->wrapped = ch;
-
- gpr_mu_init(&wrapper->channel_mu);
- gpr_cv_init(&wrapper->channel_cv);
- wrapper->mu_init_done = 1;
-
- gpr_mu_lock(&wrapper->channel_mu);
- wrapper->abort_watch_connectivity_state = 0;
- wrapper->current_connectivity_state =
- grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
- wrapper->safe_to_destroy = 0;
- wrapper->request_safe_destroy = 0;
-
- gpr_cv_broadcast(&wrapper->channel_cv);
- gpr_mu_unlock(&wrapper->channel_mu);
-
- grpc_rb_channel_try_register_connection_polling(wrapper);
+ stack.channel = ch;
+ stack.wrapper = wrapper;
+ rb_thread_call_without_gvl(
+ channel_init_try_register_connection_polling_without_gil, &stack, NULL,
+ NULL);
if (args.args != NULL) {
xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
@@ -224,10 +273,31 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
return Qnil;
}
rb_ivar_set(self, id_target, target);
- wrapper->wrapped = ch;
return self;
}
+typedef struct get_state_stack {
+ bg_watched_channel *bg;
+ int try_to_connect;
+ int out;
+} get_state_stack;
+
+static void *get_state_without_gil(void *arg) {
+ get_state_stack *stack = (get_state_stack *)arg;
+
+ gpr_mu_lock(&global_connection_polling_mu);
+ GPR_ASSERT(abort_channel_polling || channel_polling_thread_started);
+ if (stack->bg->channel_destroyed) {
+ stack->out = GRPC_CHANNEL_SHUTDOWN;
+ } else {
+ stack->out = grpc_channel_check_connectivity_state(stack->bg->channel,
+ stack->try_to_connect);
+ }
+ gpr_mu_unlock(&global_connection_polling_mu);
+
+ return NULL;
+}
+
/*
call-seq:
ch.connectivity_state -> state
@@ -240,59 +310,69 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
VALUE self) {
VALUE try_to_connect_param = Qfalse;
- int grpc_try_to_connect = 0;
grpc_rb_channel *wrapper = NULL;
- grpc_channel *ch = NULL;
+ get_state_stack stack;
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
rb_scan_args(argc, argv, "01", &try_to_connect_param);
- grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- ch = wrapper->wrapped;
- if (ch == NULL) {
+ if (wrapper->bg_wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
- return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped,
- grpc_try_to_connect));
+
+ stack.bg = wrapper->bg_wrapped;
+ stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
+ rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL);
+
+ return LONG2NUM(stack.out);
}
typedef struct watch_state_stack {
- grpc_rb_channel *wrapper;
+ grpc_channel *channel;
gpr_timespec deadline;
int last_state;
} watch_state_stack;
-static void *watch_channel_state_without_gvl(void *arg) {
+static void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
watch_state_stack *stack = (watch_state_stack *)arg;
- gpr_timespec deadline = stack->deadline;
- grpc_rb_channel *wrapper = stack->wrapper;
- int last_state = stack->last_state;
- void *return_value = (void *)0;
+ watch_state_op *op = NULL;
+ void *success = (void *)0;
- gpr_mu_lock(&wrapper->channel_mu);
- while (wrapper->current_connectivity_state == last_state &&
- !wrapper->request_safe_destroy && !wrapper->safe_to_destroy &&
- !wrapper->abort_watch_connectivity_state &&
- gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
- gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
+ gpr_mu_lock(&global_connection_polling_mu);
+ // its unsafe to do a "watch" after "channel polling abort" because the cq has
+ // been shut down.
+ if (abort_channel_polling) {
+ gpr_mu_unlock(&global_connection_polling_mu);
+ return (void *)0;
}
- if (wrapper->current_connectivity_state != last_state) {
- return_value = (void *)1;
+ op = gpr_zalloc(sizeof(watch_state_op));
+ op->op_type = WATCH_STATE_API;
+ grpc_channel_watch_connectivity_state(stack->channel, stack->last_state,
+ stack->deadline, channel_polling_cq,
+ op);
+
+ while (!op->op.api_callback_args.called_back) {
+ gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
}
- gpr_mu_unlock(&wrapper->channel_mu);
+ if (op->op.api_callback_args.success) {
+ success = (void *)1;
+ }
+ gpr_free(op);
+ gpr_mu_unlock(&global_connection_polling_mu);
- return return_value;
+ return success;
}
-
-static void watch_channel_state_unblocking_func(void *arg) {
- grpc_rb_channel *wrapper = (grpc_rb_channel *)arg;
- gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
- gpr_mu_lock(&wrapper->channel_mu);
- wrapper->abort_watch_connectivity_state = 1;
- gpr_cv_broadcast(&wrapper->channel_cv);
- gpr_mu_unlock(&wrapper->channel_mu);
+static void wait_for_watch_state_op_complete_unblocking_func(void *arg) {
+ bg_watched_channel *bg = (bg_watched_channel *)arg;
+ gpr_mu_lock(&global_connection_polling_mu);
+ if (!bg->channel_destroyed) {
+ grpc_channel_destroy(bg->channel);
+ bg->channel_destroyed = 1;
+ }
+ gpr_mu_unlock(&global_connection_polling_mu);
}
/* Wait until the channel's connectivity state becomes different from
@@ -307,11 +387,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
watch_state_stack stack;
- void *out;
+ void *op_success = 0;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- if (wrapper->wrapped == NULL) {
+ if (wrapper->bg_wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
@@ -323,16 +403,15 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil;
}
- stack.wrapper = wrapper;
- stack.deadline = grpc_rb_time_timeval(deadline, 0);
+ stack.channel = wrapper->bg_wrapped->channel;
+ stack.deadline = grpc_rb_time_timeval(deadline, 0),
stack.last_state = NUM2LONG(last_state);
- out =
- rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack,
- watch_channel_state_unblocking_func, wrapper);
- if (out) {
- return Qtrue;
- }
- return Qfalse;
+
+ op_success = rb_thread_call_without_gvl(
+ wait_for_watch_state_op_complete_without_gvl, &stack,
+ wait_for_watch_state_op_complete_unblocking_func, wrapper->bg_wrapped);
+
+ return op_success ? Qtrue : Qfalse;
}
/* Create a call given a grpc_channel, in order to call method. The request
@@ -344,7 +423,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL;
grpc_call *parent_call = NULL;
- grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
int flags = GRPC_PROPAGATE_DEFAULTS;
grpc_slice method_slice;
@@ -366,8 +444,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
cq = grpc_completion_queue_create_for_pluck(NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- ch = wrapper->wrapped;
- if (ch == NULL) {
+ if (wrapper->bg_wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
@@ -375,8 +452,8 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
method_slice =
grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
- call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice,
- host_slice_ptr,
+ call = grpc_channel_create_call(wrapper->bg_wrapped->channel, parent_call,
+ flags, cq, method_slice, host_slice_ptr,
grpc_rb_time_timeval(deadline,
/* absolute time */ 0),
NULL);
@@ -401,15 +478,16 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
}
/* Closes the channel, calling it's destroy method */
+/* Note this is an API-level call; a wrapped channel's finalizer doesn't call
+ * this */
static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL;
- grpc_channel *ch = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- ch = wrapper->wrapped;
- if (ch != NULL) {
- grpc_rb_channel_safe_destroy(wrapper);
- wrapper->wrapped = NULL;
+ if (wrapper->bg_wrapped != NULL) {
+ rb_thread_call_without_gvl(channel_safe_destroy_without_gil,
+ wrapper->bg_wrapped, NULL, NULL);
+ wrapper->bg_wrapped = NULL;
}
return Qnil;
@@ -422,64 +500,110 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
char *target = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- target = grpc_channel_get_target(wrapper->wrapped);
+ target = grpc_channel_get_target(wrapper->bg_wrapped->channel);
res = rb_str_new2(target);
gpr_free(target);
return res;
}
-// Either start polling channel connection state or signal that it's free to
-// destroy.
-// Not safe to call while a channel's connection state is polled.
-static void grpc_rb_channel_try_register_connection_polling(
- grpc_rb_channel *wrapper) {
- grpc_connectivity_state conn_state;
- gpr_timespec sleep_time = gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
-
- GPR_ASSERT(wrapper);
- GPR_ASSERT(wrapper->wrapped);
- gpr_mu_lock(&wrapper->channel_mu);
- if (wrapper->request_safe_destroy) {
- wrapper->safe_to_destroy = 1;
- gpr_cv_broadcast(&wrapper->channel_cv);
- gpr_mu_unlock(&wrapper->channel_mu);
- return;
+/* Needs to be called under global_connection_polling_mu */
+static int bg_watched_channel_list_lookup(bg_watched_channel *target) {
+ bg_watched_channel *cur = bg_watched_channel_list_head;
+
+ while (cur != NULL) {
+ if (cur == target) {
+ return 1;
+ }
+ cur = cur->next;
}
- gpr_mu_lock(&global_connection_polling_mu);
- GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
- conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
- if (conn_state != wrapper->current_connectivity_state) {
- wrapper->current_connectivity_state = conn_state;
- gpr_cv_broadcast(&wrapper->channel_cv);
- }
- // avoid posting work to the channel polling cq if it's been shutdown
- if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_channel_watch_connectivity_state(
- wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
- } else {
- wrapper->safe_to_destroy = 1;
- gpr_cv_broadcast(&wrapper->channel_cv);
+ return 0;
+}
+
+/* Needs to be called under global_connection_polling_mu */
+static bg_watched_channel *bg_watched_channel_list_create_and_add(
+ grpc_channel *channel) {
+ bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel));
+
+ watched->channel = channel;
+ watched->next = bg_watched_channel_list_head;
+ watched->refcount = 1;
+ bg_watched_channel_list_head = watched;
+ return watched;
+}
+
+/* Needs to be called under global_connection_polling_mu */
+static void bg_watched_channel_list_free_and_remove(
+ bg_watched_channel *target) {
+ bg_watched_channel *bg = NULL;
+
+ GPR_ASSERT(bg_watched_channel_list_lookup(target));
+ GPR_ASSERT(target->channel_destroyed && target->refcount == 0);
+ if (bg_watched_channel_list_head == target) {
+ bg_watched_channel_list_head = target->next;
+ gpr_free(target);
+ return;
+ }
+ bg = bg_watched_channel_list_head;
+ while (bg != NULL && bg->next != NULL) {
+ if (bg->next == target) {
+ bg->next = bg->next->next;
+ gpr_free(target);
+ return;
+ }
+ bg = bg->next;
}
+ GPR_ASSERT(0);
+}
+
+/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
+ * it onto the background thread for constant watches. */
+static void *channel_init_try_register_connection_polling_without_gil(
+ void *arg) {
+ channel_init_try_register_stack *stack =
+ (channel_init_try_register_stack *)arg;
+
+ gpr_mu_lock(&global_connection_polling_mu);
+ stack->wrapper->bg_wrapped =
+ bg_watched_channel_list_create_and_add(stack->channel);
+ grpc_rb_channel_try_register_connection_polling(stack->wrapper->bg_wrapped);
gpr_mu_unlock(&global_connection_polling_mu);
- gpr_mu_unlock(&wrapper->channel_mu);
+ return NULL;
}
-// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
-static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
- gpr_mu_lock(&wrapper->channel_mu);
- wrapper->request_safe_destroy = 1;
+// Needs to be called under global_connection_poolling_mu
+static void grpc_rb_channel_try_register_connection_polling(
+ bg_watched_channel *bg) {
+ grpc_connectivity_state conn_state;
+ watch_state_op *op = NULL;
- while (!wrapper->safe_to_destroy) {
- gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
+
+ if (bg->refcount == 0) {
+ GPR_ASSERT(bg->channel_destroyed);
+ bg_watched_channel_list_free_and_remove(bg);
+ return;
+ }
+ GPR_ASSERT(bg->refcount == 1);
+ if (bg->channel_destroyed || abort_channel_polling) {
+ return;
}
- GPR_ASSERT(wrapper->safe_to_destroy);
- gpr_mu_unlock(&wrapper->channel_mu);
- grpc_channel_destroy(wrapper->wrapped);
+ conn_state = grpc_channel_check_connectivity_state(bg->channel, 0);
+ if (conn_state == GRPC_CHANNEL_SHUTDOWN) {
+ return;
+ }
+ GPR_ASSERT(bg_watched_channel_list_lookup(bg));
+ // prevent bg from being free'd by GC while background thread is watching it
+ bg->refcount++;
+
+ op = gpr_zalloc(sizeof(watch_state_op));
+ op->op_type = CONTINUOUS_WATCH;
+ op->op.continuous_watch_callback_args.bg = bg;
+ grpc_channel_watch_connectivity_state(bg->channel, conn_state,
+ gpr_inf_future(GPR_CLOCK_REALTIME),
+ channel_polling_cq, op);
}
// Note this loop breaks out with a single call of
@@ -490,6 +614,8 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
// early and falls back to current behavior.
static void *run_poll_channels_loop_no_gil(void *arg) {
grpc_event event;
+ watch_state_op *op = NULL;
+ bg_watched_channel *bg = NULL;
(void)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin");
@@ -505,10 +631,22 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
if (event.type == GRPC_QUEUE_SHUTDOWN) {
break;
}
+ gpr_mu_lock(&global_connection_polling_mu);
if (event.type == GRPC_OP_COMPLETE) {
- grpc_rb_channel_try_register_connection_polling(
- (grpc_rb_channel *)event.tag);
+ op = (watch_state_op *)event.tag;
+ if (op->op_type == CONTINUOUS_WATCH) {
+ bg = (bg_watched_channel *)op->op.continuous_watch_callback_args.bg;
+ bg->refcount--;
+ grpc_rb_channel_try_register_connection_polling(bg);
+ gpr_free(op);
+ } else if (op->op_type == WATCH_STATE_API) {
+ grpc_rb_channel_watch_connection_state_op_complete(
+ (watch_state_op *)event.tag, event.success);
+ } else {
+ GPR_ASSERT(0);
+ }
}
+ gpr_mu_unlock(&global_connection_polling_mu);
}
grpc_completion_queue_destroy(channel_polling_cq);
gpr_log(GPR_DEBUG,
@@ -519,14 +657,36 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
// Notify the channel polling loop to cleanup and shutdown.
static void run_poll_channels_loop_unblocking_func(void *arg) {
+ bg_watched_channel *bg = NULL;
(void)arg;
+
gpr_mu_lock(&global_connection_polling_mu);
gpr_log(GPR_DEBUG,
- "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting "
+ "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting "
"connection polling");
+ // early out after first time through
+ if (abort_channel_polling) {
+ gpr_mu_unlock(&global_connection_polling_mu);
+ return;
+ }
abort_channel_polling = 1;
+
+ // force pending watches to end by switching to shutdown state
+ bg = bg_watched_channel_list_head;
+ while (bg != NULL) {
+ if (!bg->channel_destroyed) {
+ grpc_channel_destroy(bg->channel);
+ bg->channel_destroyed = 1;
+ }
+ bg = bg->next;
+ }
+
grpc_completion_queue_shutdown(channel_polling_cq);
+ gpr_cv_broadcast(&global_connection_polling_cv);
gpr_mu_unlock(&global_connection_polling_mu);
+ gpr_log(GPR_DEBUG,
+ "GRPC_RUBY: run_poll_channels_loop_unblocking_func - end aborting "
+ "connection polling");
}
// Poll channel connectivity states in background thread without the GIL.
@@ -542,10 +702,11 @@ static VALUE run_poll_channels_loop(VALUE arg) {
}
static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
- (void)arg;
+ int *stop_waiting = (int *)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start");
gpr_mu_lock(&global_connection_polling_mu);
- while (!channel_polling_thread_started && !abort_channel_polling) {
+ while (!channel_polling_thread_started && !abort_channel_polling &&
+ !*stop_waiting) {
gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
@@ -556,15 +717,22 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
static void wait_until_channel_polling_thread_started_unblocking_func(
void *arg) {
- (void)arg;
+ int *stop_waiting = (int *)arg;
gpr_mu_lock(&global_connection_polling_mu);
gpr_log(GPR_DEBUG,
- "GRPC_RUBY: "
- "wait_until_channel_polling_thread_started_unblocking_func - begin "
- "aborting connection polling");
+ "GRPC_RUBY: interrupt wait for channel polling thread to start");
+ *stop_waiting = 1;
+ gpr_cv_broadcast(&global_connection_polling_cv);
+ gpr_mu_unlock(&global_connection_polling_mu);
+}
+
+static void *set_abort_channel_polling_without_gil(void *arg) {
+ (void)arg;
+ gpr_mu_lock(&global_connection_polling_mu);
abort_channel_polling = 1;
gpr_cv_broadcast(&global_connection_polling_cv);
gpr_mu_unlock(&global_connection_polling_mu);
+ return NULL;
}
/* Temporary fix for
@@ -592,10 +760,8 @@ void grpc_rb_channel_polling_thread_start() {
if (!RTEST(background_thread)) {
gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread");
- gpr_mu_lock(&global_connection_polling_mu);
- abort_channel_polling = 1;
- gpr_cv_broadcast(&global_connection_polling_cv);
- gpr_mu_unlock(&global_connection_polling_mu);
+ rb_thread_call_without_gvl(set_abort_channel_polling_without_gil, NULL,
+ NULL, NULL);
}
}
@@ -674,5 +840,5 @@ void Init_grpc_channel() {
grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) {
grpc_rb_channel *wrapper = NULL;
TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- return wrapper->wrapped;
+ return wrapper->bg_wrapped->channel;
}
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 9a3b56ddfb..71138265c8 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -105,16 +105,16 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) {
grpc_rb_event *event = NULL;
(void)param;
gpr_mu_lock(&event_queue.mu);
- while ((event = grpc_rb_event_queue_dequeue()) == NULL) {
- gpr_cv_wait(&event_queue.cv, &event_queue.mu,
- gpr_inf_future(GPR_CLOCK_REALTIME));
- if (event_queue.abort) {
+ while (!event_queue.abort) {
+ if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
gpr_mu_unlock(&event_queue.mu);
- return NULL;
+ return event;
}
+ gpr_cv_wait(&event_queue.cv, &event_queue.mu,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&event_queue.mu);
- return event;
+ return NULL;
}
static void grpc_rb_event_unblocking_func(void *arg) {
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 5be8861e0c..c319cd1391 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -292,11 +292,12 @@ static gpr_once g_once_init = GPR_ONCE_INIT;
static void grpc_ruby_once_init_internal() {
grpc_init();
- grpc_rb_event_queue_thread_start();
- grpc_rb_channel_polling_thread_start();
atexit(grpc_rb_shutdown);
}
+static VALUE bg_thread_init_rb_mu = Qundef;
+static int bg_thread_init_done = 0;
+
void grpc_ruby_once_init() {
/* ruby_vm_at_exit doesn't seem to be working. It would crash once every
* blue moon, and some users are getting it repeatedly. See the discussions
@@ -309,6 +310,18 @@ void grpc_ruby_once_init() {
* schedule our initialization and destruction only once.
*/
gpr_once_init(&g_once_init, grpc_ruby_once_init_internal);
+
+ // Avoid calling calling into ruby library (when creating threads here)
+ // in gpr_once_init. In general, it appears to be unsafe to call
+ // into the ruby library while holding a non-ruby mutex, because a gil yield
+ // could end up trying to lock onto that same mutex and deadlocking.
+ rb_mutex_lock(bg_thread_init_rb_mu);
+ if (!bg_thread_init_done) {
+ grpc_rb_event_queue_thread_start();
+ grpc_rb_channel_polling_thread_start();
+ bg_thread_init_done = 1;
+ }
+ rb_mutex_unlock(bg_thread_init_rb_mu);
}
void Init_grpc_c() {
@@ -317,6 +330,9 @@ void Init_grpc_c() {
return;
}
+ bg_thread_init_rb_mu = rb_mutex_new();
+ rb_global_variable(&bg_thread_init_rb_mu);
+
grpc_rb_mGRPC = rb_define_module("GRPC");
grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
grpc_rb_sNewServerRpc = rb_struct_define(
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 221a1e14ec..f6a4ff6795 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -103,6 +103,7 @@ grpc_alarm_create_type grpc_alarm_create_import;
grpc_alarm_cancel_type grpc_alarm_cancel_import;
grpc_alarm_destroy_type grpc_alarm_destroy_import;
grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
+grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import;
grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
grpc_channel_create_call_type grpc_channel_create_call_import;
grpc_channel_ping_type grpc_channel_ping_import;
@@ -270,6 +271,7 @@ gpr_histogram_get_contents_type gpr_histogram_get_contents_import;
gpr_histogram_merge_contents_type gpr_histogram_merge_contents_import;
gpr_join_host_port_type gpr_join_host_port_import;
gpr_split_host_port_type gpr_split_host_port_import;
+gpr_log_severity_string_type gpr_log_severity_string_import;
gpr_log_type gpr_log_import;
gpr_log_message_type gpr_log_message_import;
gpr_set_log_verbosity_type gpr_set_log_verbosity_import;
@@ -404,6 +406,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel");
grpc_alarm_destroy_import = (grpc_alarm_destroy_type) GetProcAddress(library, "grpc_alarm_destroy");
grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state");
+ grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers");
grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state");
grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call");
grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping");
@@ -571,6 +574,7 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_histogram_merge_contents_import = (gpr_histogram_merge_contents_type) GetProcAddress(library, "gpr_histogram_merge_contents");
gpr_join_host_port_import = (gpr_join_host_port_type) GetProcAddress(library, "gpr_join_host_port");
gpr_split_host_port_import = (gpr_split_host_port_type) GetProcAddress(library, "gpr_split_host_port");
+ gpr_log_severity_string_import = (gpr_log_severity_string_type) GetProcAddress(library, "gpr_log_severity_string");
gpr_log_import = (gpr_log_type) GetProcAddress(library, "gpr_log");
gpr_log_message_import = (gpr_log_message_type) GetProcAddress(library, "gpr_log_message");
gpr_set_log_verbosity_import = (gpr_set_log_verbosity_type) GetProcAddress(library, "gpr_set_log_verbosity");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index f62b31e83d..0d64290b55 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -260,6 +260,9 @@ extern grpc_alarm_destroy_type grpc_alarm_destroy_import;
typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel *channel, int try_to_connect);
extern grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
#define grpc_channel_check_connectivity_state grpc_channel_check_connectivity_state_import
+typedef int(*grpc_channel_num_external_connectivity_watchers_type)(grpc_channel *channel);
+extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import;
+#define grpc_channel_num_external_connectivity_watchers grpc_channel_num_external_connectivity_watchers_import
typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag);
extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
#define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import
@@ -761,6 +764,9 @@ extern gpr_join_host_port_type gpr_join_host_port_import;
typedef int(*gpr_split_host_port_type)(const char *name, char **host, char **port);
extern gpr_split_host_port_type gpr_split_host_port_import;
#define gpr_split_host_port gpr_split_host_port_import
+typedef const char *(*gpr_log_severity_string_type)(gpr_log_severity severity);
+extern gpr_log_severity_string_type gpr_log_severity_string_import;
+#define gpr_log_severity_string gpr_log_severity_string_import
typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
extern gpr_log_type gpr_log_import;
#define gpr_log gpr_log_import
diff --git a/src/ruby/lib/grpc/grpc.rb b/src/ruby/lib/grpc/grpc.rb
index f46710dc74..48f2a45d44 100644
--- a/src/ruby/lib/grpc/grpc.rb
+++ b/src/ruby/lib/grpc/grpc.rb
@@ -34,6 +34,6 @@ begin
if File.directory?(distrib_lib_dir)
require_relative "#{distrib_lib_dir}/grpc_c"
else
- require_relative 'grpc_c'
+ require 'grpc/grpc_c'
end
end
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index f30dff335f..e2e784d19f 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -29,5 +29,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '1.4.0.dev'
+ VERSION = '1.5.0.dev'
end
diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb
index 940d68b9b0..c8a7856a09 100644
--- a/src/ruby/spec/channel_connection_spec.rb
+++ b/src/ruby/spec/channel_connection_spec.rb
@@ -28,6 +28,10 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
+require 'timeout'
+
+include Timeout
+include GRPC::Core
# A test message
class EchoMsg
@@ -62,7 +66,7 @@ end
EchoStub = EchoService.rpc_stub_class
def start_server(port = 0)
- @srv = GRPC::RpcServer.new
+ @srv = GRPC::RpcServer.new(pool_size: 1)
server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
@srv.handle(EchoService)
@server_thd = Thread.new { @srv.run }
@@ -138,4 +142,32 @@ describe 'channel connection behavior' do
stop_server
end
+
+ it 'concurrent watches on the same channel' do
+ timeout(180) do
+ port = start_server
+ ch = GRPC::Core::Channel.new("localhost:#{port}", {},
+ :this_channel_is_insecure)
+ stop_server
+
+ thds = []
+ 50.times do
+ thds << Thread.new do
+ while ch.connectivity_state(true) != ConnectivityStates::READY
+ ch.watch_connectivity_state(
+ ConnectivityStates::READY, Time.now + 60)
+ break
+ end
+ end
+ end
+
+ sleep 0.01
+
+ start_server(port)
+
+ thds.each(&:join)
+
+ stop_server
+ end
+ end
end
diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb
index 1f8d4afb95..4f74238df7 100644
--- a/src/ruby/tools/version.rb
+++ b/src/ruby/tools/version.rb
@@ -29,6 +29,6 @@
module GRPC
module Tools
- VERSION = '1.4.0.dev'
+ VERSION = '1.5.0.dev'
end
end