aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-06-06 13:26:36 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2017-06-06 13:26:36 -0700
commitb89b5011ecfe5135e03e98fb058e0ad345747007 (patch)
treec49d57c3c752150148840b672474ccfeb81d6294 /src/core
parent4ebace71b071d9dd38a089c88b737b52fe57dfd5 (diff)
parentb1332047d049d788a59284a561f4d6c2c2488792 (diff)
Merge remote-tracking branch 'upstream/master' into srv_record
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/intrusive_hash_map.c320
-rw-r--r--src/core/ext/census/intrusive_hash_map.h167
-rw-r--r--src/core/ext/census/intrusive_hash_map_internal.h63
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c77
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c115
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c459
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c1
-rw-r--r--src/core/lib/http/httpcli.c4
-rw-r--r--src/core/lib/http/httpcli_security_connector.c4
-rw-r--r--src/core/lib/security/transport/security_connector.c19
-rw-r--r--src/core/lib/security/transport/security_handshaker.c220
12 files changed, 1034 insertions, 421 deletions
diff --git a/src/core/ext/census/intrusive_hash_map.c b/src/core/ext/census/intrusive_hash_map.c
new file mode 100644
index 0000000000..9f56b765e1
--- /dev/null
+++ b/src/core/ext/census/intrusive_hash_map.c
@@ -0,0 +1,320 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/intrusive_hash_map.h"
+#include <string.h>
+
+extern bool hm_index_compare(const hm_index *A, const hm_index *B);
+
+/* Simple hashing function that takes lower 32 bits. */
+static __inline uint32_t chunked_vector_hasher(uint64_t key) {
+ return (uint32_t)key;
+}
+
+/* Vector chunks are 1MiB divided by pointer size. */
+static const size_t VECTOR_CHUNK_SIZE = (1 << 20) / sizeof(void *);
+
+/* Helper functions which return buckets from the chunked vector. */
+static __inline void **get_mutable_bucket(const chunked_vector *buckets,
+ uint32_t index) {
+ if (index < VECTOR_CHUNK_SIZE) {
+ return &buckets->first_[index];
+ }
+ size_t rest_index = (index - VECTOR_CHUNK_SIZE) / VECTOR_CHUNK_SIZE;
+ return &buckets->rest_[rest_index][index % VECTOR_CHUNK_SIZE];
+}
+
+static __inline void *get_bucket(const chunked_vector *buckets,
+ uint32_t index) {
+ if (index < VECTOR_CHUNK_SIZE) {
+ return buckets->first_[index];
+ }
+ size_t rest_index = (index - VECTOR_CHUNK_SIZE) / VECTOR_CHUNK_SIZE;
+ return buckets->rest_[rest_index][index % VECTOR_CHUNK_SIZE];
+}
+
+/* Helper function. */
+static __inline size_t RestSize(const chunked_vector *vec) {
+ return (vec->size_ <= VECTOR_CHUNK_SIZE)
+ ? 0
+ : (vec->size_ - VECTOR_CHUNK_SIZE - 1) / VECTOR_CHUNK_SIZE + 1;
+}
+
+/* Initialize chunked vector to size of 0. */
+static void chunked_vector_init(chunked_vector *vec) {
+ vec->size_ = 0;
+ vec->first_ = NULL;
+ vec->rest_ = NULL;
+}
+
+/* Clear chunked vector and free all memory that has been allocated then
+ initialize chunked vector. */
+static void chunked_vector_clear(chunked_vector *vec) {
+ if (vec->first_ != NULL) {
+ gpr_free(vec->first_);
+ }
+ if (vec->rest_ != NULL) {
+ size_t rest_size = RestSize(vec);
+ for (size_t i = 0; i < rest_size; ++i) {
+ if (vec->rest_[i] != NULL) {
+ gpr_free(vec->rest_[i]);
+ }
+ }
+ gpr_free(vec->rest_);
+ }
+ chunked_vector_init(vec);
+}
+
+/* Clear chunked vector and then resize it to n entries. Allow the first 1MB to
+ be read w/o an extra cache miss. The rest of the elements are stored in an
+ array of arrays to avoid large mallocs. */
+static void chunked_vector_reset(chunked_vector *vec, size_t n) {
+ chunked_vector_clear(vec);
+ vec->size_ = n;
+ if (n <= VECTOR_CHUNK_SIZE) {
+ vec->first_ = (void **)gpr_malloc(sizeof(void *) * n);
+ memset(vec->first_, 0, sizeof(void *) * n);
+ } else {
+ vec->first_ = (void **)gpr_malloc(sizeof(void *) * VECTOR_CHUNK_SIZE);
+ memset(vec->first_, 0, sizeof(void *) * VECTOR_CHUNK_SIZE);
+ size_t rest_size = RestSize(vec);
+ vec->rest_ = (void ***)gpr_malloc(sizeof(void **) * rest_size);
+ memset(vec->rest_, 0, sizeof(void **) * rest_size);
+ int i = 0;
+ n -= VECTOR_CHUNK_SIZE;
+ while (n > 0) {
+ size_t this_size = GPR_MIN(n, VECTOR_CHUNK_SIZE);
+ vec->rest_[i] = (void **)gpr_malloc(sizeof(void *) * this_size);
+ memset(vec->rest_[i], 0, sizeof(void *) * this_size);
+ n -= this_size;
+ ++i;
+ }
+ }
+}
+
+void intrusive_hash_map_init(intrusive_hash_map *hash_map,
+ uint32_t initial_log2_table_size) {
+ hash_map->log2_num_buckets = initial_log2_table_size;
+ hash_map->num_items = 0;
+ uint32_t num_buckets = (uint32_t)1 << hash_map->log2_num_buckets;
+ hash_map->extend_threshold = num_buckets >> 1;
+ chunked_vector_init(&hash_map->buckets);
+ chunked_vector_reset(&hash_map->buckets, num_buckets);
+ hash_map->hash_mask = num_buckets - 1;
+}
+
+bool intrusive_hash_map_empty(const intrusive_hash_map *hash_map) {
+ return hash_map->num_items == 0;
+}
+
+size_t intrusive_hash_map_size(const intrusive_hash_map *hash_map) {
+ return hash_map->num_items;
+}
+
+void intrusive_hash_map_end(const intrusive_hash_map *hash_map, hm_index *idx) {
+ idx->bucket_index = (uint32_t)hash_map->buckets.size_;
+ GPR_ASSERT(idx->bucket_index <= UINT32_MAX);
+ idx->item = NULL;
+}
+
+void intrusive_hash_map_next(const intrusive_hash_map *hash_map,
+ hm_index *idx) {
+ idx->item = idx->item->hash_link;
+ while (idx->item == NULL) {
+ idx->bucket_index++;
+ if (idx->bucket_index >= hash_map->buckets.size_) {
+ /* Reached end of table. */
+ idx->item = NULL;
+ return;
+ }
+ idx->item = (hm_item *)get_bucket(&hash_map->buckets, idx->bucket_index);
+ }
+}
+
+void intrusive_hash_map_begin(const intrusive_hash_map *hash_map,
+ hm_index *idx) {
+ for (uint32_t i = 0; i < hash_map->buckets.size_; ++i) {
+ if (get_bucket(&hash_map->buckets, i) != NULL) {
+ idx->bucket_index = i;
+ idx->item = (hm_item *)get_bucket(&hash_map->buckets, i);
+ return;
+ }
+ }
+ intrusive_hash_map_end(hash_map, idx);
+}
+
+hm_item *intrusive_hash_map_find(const intrusive_hash_map *hash_map,
+ uint64_t key) {
+ uint32_t index = chunked_vector_hasher(key) & hash_map->hash_mask;
+
+ hm_item *p = (hm_item *)get_bucket(&hash_map->buckets, index);
+ while (p != NULL) {
+ if (key == p->key) {
+ return p;
+ }
+ p = p->hash_link;
+ }
+ return NULL;
+}
+
+hm_item *intrusive_hash_map_erase(intrusive_hash_map *hash_map, uint64_t key) {
+ uint32_t index = chunked_vector_hasher(key) & hash_map->hash_mask;
+
+ hm_item **slot = (hm_item **)get_mutable_bucket(&hash_map->buckets, index);
+ hm_item *p = *slot;
+ if (p == NULL) {
+ return NULL;
+ }
+
+ if (key == p->key) {
+ *slot = p->hash_link;
+ p->hash_link = NULL;
+ hash_map->num_items--;
+ return p;
+ }
+
+ hm_item *prev = p;
+ p = p->hash_link;
+
+ while (p) {
+ if (key == p->key) {
+ prev->hash_link = p->hash_link;
+ p->hash_link = NULL;
+ hash_map->num_items--;
+ return p;
+ }
+ prev = p;
+ p = p->hash_link;
+ }
+ return NULL;
+}
+
+/* Insert an hm_item* into the underlying chunked vector. hash_mask is
+ * array_size-1. Returns true if it is a new hm_item and false if the hm_item
+ * already existed.
+ */
+static __inline bool intrusive_hash_map_internal_insert(chunked_vector *buckets,
+ uint32_t hash_mask,
+ hm_item *item) {
+ const uint64_t key = item->key;
+ uint32_t index = chunked_vector_hasher(key) & hash_mask;
+ hm_item **slot = (hm_item **)get_mutable_bucket(buckets, index);
+ hm_item *p = *slot;
+ item->hash_link = p;
+
+ /* Check to see if key already exists. */
+ while (p) {
+ if (p->key == key) {
+ return false;
+ }
+ p = p->hash_link;
+ }
+
+ /* Otherwise add new entry. */
+ *slot = item;
+ return true;
+}
+
+/* Extend the allocated number of elements in the hash map by a factor of 2. */
+void intrusive_hash_map_extend(intrusive_hash_map *hash_map) {
+ uint32_t new_log2_num_buckets = 1 + hash_map->log2_num_buckets;
+ uint32_t new_num_buckets = (uint32_t)1 << new_log2_num_buckets;
+ GPR_ASSERT(new_num_buckets <= UINT32_MAX && new_num_buckets > 0);
+ chunked_vector new_buckets;
+ chunked_vector_init(&new_buckets);
+ chunked_vector_reset(&new_buckets, new_num_buckets);
+ uint32_t new_hash_mask = new_num_buckets - 1;
+
+ hm_index cur_idx;
+ hm_index end_idx;
+ intrusive_hash_map_end(hash_map, &end_idx);
+ intrusive_hash_map_begin(hash_map, &cur_idx);
+ while (!hm_index_compare(&cur_idx, &end_idx)) {
+ hm_item *new_item = cur_idx.item;
+ intrusive_hash_map_next(hash_map, &cur_idx);
+ intrusive_hash_map_internal_insert(&new_buckets, new_hash_mask, new_item);
+ }
+
+ /* Set values for new chunked_vector. extend_threshold is set to half of
+ * new_num_buckets. */
+ hash_map->log2_num_buckets = new_log2_num_buckets;
+ chunked_vector_clear(&hash_map->buckets);
+ hash_map->buckets = new_buckets;
+ hash_map->hash_mask = new_hash_mask;
+ hash_map->extend_threshold = new_num_buckets >> 1;
+}
+
+/* Insert a hm_item. The hm_item must remain live until it is removed from the
+ table. This object does not take the ownership of hm_item. The caller must
+ remove this hm_item from the table and delete it before this table is
+ deleted. If hm_item exists already num_items is not changed. */
+bool intrusive_hash_map_insert(intrusive_hash_map *hash_map, hm_item *item) {
+ if (hash_map->num_items >= hash_map->extend_threshold) {
+ intrusive_hash_map_extend(hash_map);
+ }
+ if (intrusive_hash_map_internal_insert(&hash_map->buckets,
+ hash_map->hash_mask, item)) {
+ hash_map->num_items++;
+ return true;
+ }
+ return false;
+}
+
+void intrusive_hash_map_clear(intrusive_hash_map *hash_map,
+ void (*free_object)(void *)) {
+ hm_index cur;
+ hm_index end;
+ intrusive_hash_map_end(hash_map, &end);
+ intrusive_hash_map_begin(hash_map, &cur);
+
+ while (!hm_index_compare(&cur, &end)) {
+ hm_index next = cur;
+ intrusive_hash_map_next(hash_map, &next);
+ if (cur.item != NULL) {
+ hm_item *item = intrusive_hash_map_erase(hash_map, cur.item->key);
+ (*free_object)((void *)item);
+ gpr_free(item);
+ }
+ cur = next;
+ }
+}
+
+void intrusive_hash_map_free(intrusive_hash_map *hash_map,
+ void (*free_object)(void *)) {
+ intrusive_hash_map_clear(hash_map, (*free_object));
+ hash_map->num_items = 0;
+ hash_map->extend_threshold = 0;
+ hash_map->log2_num_buckets = 0;
+ hash_map->hash_mask = 0;
+ chunked_vector_clear(&hash_map->buckets);
+}
diff --git a/src/core/ext/census/intrusive_hash_map.h b/src/core/ext/census/intrusive_hash_map.h
new file mode 100644
index 0000000000..e316bf4b16
--- /dev/null
+++ b/src/core/ext/census/intrusive_hash_map.h
@@ -0,0 +1,167 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H
+#define GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H
+
+#include "src/core/ext/census/intrusive_hash_map_internal.h"
+
+/* intrusive_hash_map is a fast chained hash table. This hash map is faster than
+ * a dense hash map when the application calls insert and erase more often than
+ * find. When the workload is dominated by find() a dense hash map may be
+ * faster.
+ *
+ * intrusive_hash_map uses an intrusive header placed within a user defined
+ * struct. The header field IHM_key MUST be set to a valid value before
+ * insertion into the hash map or undefined behavior may occur. The header field
+ * IHM_hash_link MUST to be set to NULL initially.
+ *
+ * EXAMPLE USAGE:
+ *
+ * typedef struct string_item {
+ * INTRUSIVE_HASH_MAP_HEADER;
+ * // User data.
+ * char *str_buf;
+ * uint16_t len;
+ * } string_item;
+ *
+ * static string_item *make_string_item(uint64_t key, const char *buf,
+ * uint16_t len) {
+ * string_item *item = (string_item *)gpr_malloc(sizeof(string_item));
+ * item->IHM_key = key;
+ * item->IHM_hash_link = NULL;
+ * item->len = len;
+ * item->str_buf = (char *)malloc(len);
+ * memcpy(item->str_buf, buf, len);
+ * return item;
+ * }
+ *
+ * intrusive_hash_map hash_map;
+ * intrusive_hash_map_init(&hash_map, 4);
+ * string_item *new_item1 = make_string_item(10, "test1", 5);
+ * bool ok = intrusive_hash_map_insert(&hash_map, (hm_item *)new_item1);
+ *
+ * string_item *item1 =
+ * (string_item *)intrusive_hash_map_find(&hash_map, 10);
+ */
+
+/* Hash map item. Stores key and a pointer to the actual object. A user defined
+ * version of this can be passed in provided the first 2 entries (key and
+ * hash_link) are the same. These entries must be first in the user defined
+ * struct. Pointer to struct will need to be cast as (hm_item *) when passed to
+ * hash map. This allows it to be intrusive. */
+typedef struct hm_item {
+ uint64_t key;
+ struct hm_item *hash_link;
+ /* Optional user defined data after this. */
+} hm_item;
+
+/* Macro provided for ease of use. This must be first in the user defined
+ * struct (i.e. uint64_t key and hm_item * must be the first two elements in
+ * that order). */
+#define INTRUSIVE_HASH_MAP_HEADER \
+ uint64_t IHM_key; \
+ struct hm_item *IHM_hash_link
+
+/* Index struct which acts as a pseudo-iterator within the hash map. */
+typedef struct hm_index {
+ uint32_t bucket_index; // hash map bucket index.
+ hm_item *item; // Pointer to hm_item within the hash map.
+} hm_index;
+
+/* Returns true if two hm_indices point to the same object within the hash map
+ * and false otherwise. */
+__inline bool hm_index_compare(const hm_index *A, const hm_index *B) {
+ return (A->item == B->item && A->bucket_index == B->bucket_index);
+}
+
+/*
+ * Helper functions for iterating over the hash map.
+ */
+
+/* On return idx will contain an invalid index which is always equal to
+ * hash_map->buckets.size_ */
+void intrusive_hash_map_end(const intrusive_hash_map *hash_map, hm_index *idx);
+
+/* Iterates index to the next valid entry in the hash map and stores the
+ * index within idx. If end of table is reached, idx will contain the same
+ * values as if intrusive_hash_map_end() was called. */
+void intrusive_hash_map_next(const intrusive_hash_map *hash_map, hm_index *idx);
+
+/* On return, idx will contain the index of the first non-null entry in the hash
+ * map. If the hash map is empty, idx will contain the same values as if
+ * intrusive_hash_map_end() was called. */
+void intrusive_hash_map_begin(const intrusive_hash_map *hash_map,
+ hm_index *idx);
+
+/* Initialize intrusive hash map data structure. This must be called before
+ * the hash map can be used. The initial size of an intrusive hash map will be
+ * 2^initial_log2_map_size (valid range is [0, 31]). */
+void intrusive_hash_map_init(intrusive_hash_map *hash_map,
+ uint32_t initial_log2_map_size);
+
+/* Returns true if the hash map is empty and false otherwise. */
+bool intrusive_hash_map_empty(const intrusive_hash_map *hash_map);
+
+/* Returns the number of elements currently in the hash map. */
+size_t intrusive_hash_map_size(const intrusive_hash_map *hash_map);
+
+/* Find a hm_item within the hash map by key. Returns NULL if item was not
+ * found. */
+hm_item *intrusive_hash_map_find(const intrusive_hash_map *hash_map,
+ uint64_t key);
+
+/* Erase the hm_item that corresponds with key. If the hm_item is found, return
+ * the pointer to the hm_item. Else returns NULL. */
+hm_item *intrusive_hash_map_erase(intrusive_hash_map *hash_map, uint64_t key);
+
+/* Attempts to insert a new hm_item into the hash map. If an element with the
+ * same key already exists, it will not insert the new item and return false.
+ * Otherwise, it will insert the new item and return true. */
+bool intrusive_hash_map_insert(intrusive_hash_map *hash_map, hm_item *item);
+
+/* Clears entire contents of the hash map, but leaves internal data structure
+ * untouched. Second argument takes a function pointer to a function that will
+ * free the object designated by the user and pointed to by hash_map->value. */
+void intrusive_hash_map_clear(intrusive_hash_map *hash_map,
+ void (*free_object)(void *));
+
+/* Erase all contents of hash map and free the memory. Hash map is invalid
+ * after calling this function and cannot be used until it has been
+ * reinitialized (intrusive_hash_map_init()). This function takes a function
+ * pointer to a function that will free the object designated by the user and
+ * pointed to by hash_map->value. */
+void intrusive_hash_map_free(intrusive_hash_map *hash_map,
+ void (*free_object)(void *));
+
+#endif /* GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H */
diff --git a/src/core/ext/census/intrusive_hash_map_internal.h b/src/core/ext/census/intrusive_hash_map_internal.h
new file mode 100644
index 0000000000..76a9a3a722
--- /dev/null
+++ b/src/core/ext/census/intrusive_hash_map_internal.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H
+#define GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+#include <stdbool.h>
+
+/* The chunked vector is a data structure that allocates buckets for use in the
+ * hash map. ChunkedVector is logically equivalent to T*[N] (cast void* as
+ * T*). It's internally implemented as an array of 1MB arrays to avoid
+ * allocating large consecutive memory chunks. This is an internal data
+ * structure that should never be accessed directly. */
+typedef struct chunked_vector {
+ size_t size_;
+ void **first_;
+ void ***rest_;
+} chunked_vector;
+
+/* Core intrusive hash map data structure. All internal elements are managed by
+ * functions and should not be altered manually. */
+typedef struct intrusive_hash_map {
+ uint32_t num_items;
+ uint32_t extend_threshold;
+ uint32_t log2_num_buckets;
+ uint32_t hash_mask;
+ chunked_vector buckets;
+} intrusive_hash_map;
+
+#endif /* GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H */
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index f83670db82..04666edbec 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -67,9 +67,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
typedef enum {
WAITING,
- CALLING_BACK,
+ READY_TO_CALL_BACK,
CALLING_BACK_AND_FINISHED,
- CALLED_BACK
} callback_phase;
typedef struct {
@@ -77,11 +76,13 @@ typedef struct {
callback_phase phase;
grpc_closure on_complete;
grpc_closure on_timeout;
+ grpc_closure watcher_timer_init;
grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
grpc_cq_completion completion_storage;
grpc_channel *channel;
+ grpc_error *error;
void *tag;
} state_watcher;
@@ -105,11 +106,8 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
- case CALLED_BACK:
+ case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return );
- case CALLING_BACK:
- w->phase = CALLED_BACK;
- break;
case CALLING_BACK_AND_FINISHED:
delete = 1;
break;
@@ -123,10 +121,14 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
bool due_to_completion, grpc_error *error) {
- int delete = 0;
-
if (due_to_completion) {
grpc_timer_cancel(exec_ctx, &w->alarm);
+ } else {
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq), NULL,
+ &w->on_complete, NULL);
}
gpr_mu_lock(&w->mu);
@@ -147,25 +149,27 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
}
switch (w->phase) {
case WAITING:
- w->phase = CALLING_BACK;
- grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error),
- finished_completion, w, &w->completion_storage);
+ GRPC_ERROR_REF(error);
+ w->error = error;
+ w->phase = READY_TO_CALL_BACK;
break;
- case CALLING_BACK:
+ case READY_TO_CALL_BACK:
+ if (error != GRPC_ERROR_NONE) {
+ GPR_ASSERT(!due_to_completion);
+ GRPC_ERROR_UNREF(w->error);
+ GRPC_ERROR_REF(error);
+ w->error = error;
+ }
w->phase = CALLING_BACK_AND_FINISHED;
+ grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w,
+ &w->completion_storage);
break;
case CALLING_BACK_AND_FINISHED:
GPR_UNREACHABLE_CODE(return );
- case CALLED_BACK:
- delete = 1;
break;
}
gpr_mu_unlock(&w->mu);
- if (delete) {
- delete_state_watcher(exec_ctx, w);
- }
-
GRPC_ERROR_UNREF(error);
}
@@ -179,6 +183,28 @@ static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error));
}
+int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) {
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ return grpc_client_channel_num_external_connectivity_watchers(
+ client_channel_elem);
+}
+
+typedef struct watcher_timer_init_arg {
+ state_watcher *w;
+ gpr_timespec deadline;
+} watcher_timer_init_arg;
+
+static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_ignored) {
+ watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg;
+
+ grpc_timer_init(exec_ctx, &wa->w->alarm,
+ gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC),
+ &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_free(wa);
+}
+
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
@@ -208,16 +234,19 @@ void grpc_channel_watch_connectivity_state(
w->cq = cq;
w->tag = tag;
w->channel = channel;
+ w->error = NULL;
- grpc_timer_init(&exec_ctx, &w->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
+ watcher_timer_init_arg *wa = gpr_malloc(sizeof(watcher_timer_init_arg));
+ wa->w = w;
+ wa->deadline = deadline;
+ grpc_closure_init(&w->watcher_timer_init, watcher_timer_init, wa,
+ grpc_schedule_on_exec_ctx);
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
- grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
- grpc_cq_pollset(cq), &w->state,
- &w->on_complete);
+ grpc_client_channel_watch_connectivity_state(
+ &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state,
+ &w->on_complete, &w->watcher_timer_init);
} else {
abort();
}
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index f2f27b9175..8cebbe9eca 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -167,6 +167,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
return value;
}
+struct external_connectivity_watcher;
+
/*************************************************************************
* CHANNEL-WIDE FUNCTIONS
*/
@@ -204,6 +206,11 @@ typedef struct client_channel_channel_data {
/** interested parties (owned) */
grpc_pollset_set *interested_parties;
+ /* external_connectivity_watcher_list head is guarded by its own mutex, since
+ * counts need to be grabbed immediately without polling on a cq */
+ gpr_mu external_connectivity_watcher_list_mu;
+ struct external_connectivity_watcher *external_connectivity_watcher_list_head;
+
/* the following properties are guarded by a mutex since API's require them
to be instantaneously available */
gpr_mu info_mu;
@@ -661,6 +668,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
// Initialize data members.
chand->combiner = grpc_combiner_create(NULL);
gpr_mu_init(&chand->info_mu);
+ gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
+
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ chand->external_connectivity_watcher_list_head = NULL;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
on_resolver_result_changed_locked, chand,
@@ -749,6 +762,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
gpr_mu_destroy(&chand->info_mu);
+ gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
}
/*************************************************************************
@@ -1431,14 +1445,79 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
return out;
}
-typedef struct {
+typedef struct external_connectivity_watcher {
channel_data *chand;
grpc_pollset *pollset;
grpc_closure *on_complete;
+ grpc_closure *watcher_timer_init;
grpc_connectivity_state *state;
grpc_closure my_closure;
+ struct external_connectivity_watcher *next;
} external_connectivity_watcher;
+static external_connectivity_watcher *lookup_external_connectivity_watcher(
+ channel_data *chand, grpc_closure *on_complete) {
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL && w->on_complete != on_complete) {
+ w = w->next;
+ }
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return w;
+}
+
+static void external_connectivity_watcher_list_append(
+ channel_data *chand, external_connectivity_watcher *w) {
+ GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
+
+ gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
+ GPR_ASSERT(!w->next);
+ w->next = chand->external_connectivity_watcher_list_head;
+ chand->external_connectivity_watcher_list_head = w;
+ gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
+}
+
+static void external_connectivity_watcher_list_remove(
+ channel_data *chand, external_connectivity_watcher *too_remove) {
+ GPR_ASSERT(
+ lookup_external_connectivity_watcher(chand, too_remove->on_complete));
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ if (too_remove == chand->external_connectivity_watcher_list_head) {
+ chand->external_connectivity_watcher_list_head = too_remove->next;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return;
+ }
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL) {
+ if (w->next == too_remove) {
+ w->next = w->next->next;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return;
+ }
+ w = w->next;
+ }
+ GPR_UNREACHABLE_CODE(return );
+}
+
+int grpc_client_channel_num_external_connectivity_watchers(
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ int count = 0;
+
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL) {
+ count++;
+ w = w->next;
+ }
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+
+ return count;
+}
+
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
external_connectivity_watcher *w = arg;
@@ -1447,6 +1526,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
w->pollset);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
+ external_connectivity_watcher_list_remove(w->chand, w);
gpr_free(w);
grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
}
@@ -1454,21 +1534,42 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
external_connectivity_watcher *w = arg;
- grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
- grpc_schedule_on_exec_ctx);
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ external_connectivity_watcher *found = NULL;
+ if (w->state != NULL) {
+ external_connectivity_watcher_list_append(w->chand, w);
+ grpc_closure_run(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
+ grpc_schedule_on_exec_ctx);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ } else {
+ GPR_ASSERT(w->watcher_timer_init == NULL);
+ found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
+ if (found) {
+ GPR_ASSERT(found->on_complete == w->on_complete);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
+ }
+ grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
+ w->pollset);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_free(w);
+ }
}
void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *closure) {
+ grpc_connectivity_state *state, grpc_closure *closure,
+ grpc_closure *watcher_timer_init) {
channel_data *chand = elem->channel_data;
- external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
w->chand = chand;
w->pollset = pollset;
w->on_complete = closure;
w->state = state;
+ w->watcher_timer_init = watcher_timer_init;
+
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 8d2490ea55..356a7ab0c1 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -53,9 +53,13 @@ extern const grpc_channel_filter grpc_client_channel_filter;
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
+int grpc_client_channel_num_external_connectivity_watchers(
+ grpc_channel_element *elem);
+
void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *on_complete);
+ grpc_connectivity_state *state, grpc_closure *on_complete,
+ grpc_closure *watcher_timer_init);
/* Debug helper: pull the subchannel call from a call stack element */
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 6e7f410635..7ee6ffb787 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -99,26 +99,13 @@ typedef struct pending_pick {
grpc_closure *on_complete;
} pending_pick;
-/** List of subchannels in a connectivity READY state */
-typedef struct ready_list {
- grpc_subchannel *subchannel;
- /* references namesake entry in subchannel_data */
- void *user_data;
- struct ready_list *next;
- struct ready_list *prev;
-} ready_list;
-
typedef struct {
- /** index within policy->subchannels */
- size_t index;
/** backpointer to owning policy */
round_robin_lb_policy *policy;
/** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
grpc_closure connectivity_changed_closure;
- /** this subchannels current position in subchannel->ready_list */
- ready_list *ready_list_node;
/** last observed connectivity. Not updated by
* \a grpc_subchannel_notify_on_state_change. Used to determine the previous
* state while processing the new state in \a rr_connectivity_changed */
@@ -126,6 +113,10 @@ typedef struct {
/** current connectivity state. Updated by \a
* grpc_subchannel_notify_on_state_change */
grpc_connectivity_state curr_connectivity_state;
+ /** connectivity state to be updated by the watcher, not guarded by
+ * the combiner. Will be moved to curr_connectivity_state inside of
+ * the combiner by rr_connectivity_changed_locked(). */
+ grpc_connectivity_state pending_connectivity_state_unsafe;
/** the subchannel's target user data */
void *user_data;
/** vtable to operate over \a user_data */
@@ -141,182 +132,106 @@ struct round_robin_lb_policy {
/** all our subchannels */
size_t num_subchannels;
- subchannel_data **subchannels;
+ subchannel_data *subchannels;
- /** how many subchannels are in TRANSIENT_FAILURE */
+ /** how many subchannels are in state READY */
+ size_t num_ready;
+ /** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
- /** how many subchannels are IDLE */
+ /** how many subchannels are in state IDLE */
size_t num_idle;
/** have we started picking? */
- int started_picking;
+ bool started_picking;
/** are we shutting down? */
- int shutdown;
+ bool shutdown;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
- /** (Dummy) root of the doubly linked list containing READY subchannels */
- ready_list ready_list;
- /** Last pick from the ready list. */
- ready_list *ready_list_last_pick;
+ // Index into subchannels for last pick.
+ size_t last_ready_subchannel_index;
};
-/** Returns the next subchannel from the connected list or NULL if the list is
- * empty.
+/** Returns the index into p->subchannels of the next subchannel in
+ * READY state, or p->num_subchannels if no subchannel is READY.
*
- * Note that this function does *not* advance p->ready_list_last_pick. Use \a
- * advance_last_picked_locked() for that. */
-static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) {
- ready_list *selected;
- selected = p->ready_list_last_pick->next;
-
- while (selected != NULL) {
- if (selected == &p->ready_list) {
- GPR_ASSERT(selected->subchannel == NULL);
- /* skip dummy root */
- selected = selected->next;
- } else {
- GPR_ASSERT(selected->subchannel != NULL);
- return selected;
- }
+ * Note that this function does *not* update p->last_ready_subchannel_index.
+ * The caller must do that if it returns a pick. */
+static size_t get_next_ready_subchannel_index_locked(
+ const round_robin_lb_policy *p) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR: %p] getting next ready subchannel, "
+ "last_ready_subchannel_index=%lu",
+ p, (unsigned long)p->last_ready_subchannel_index);
}
- return NULL;
-}
-
-/** Advance the \a ready_list picking head. */
-static void advance_last_picked_locked(round_robin_lb_policy *p) {
- if (p->ready_list_last_pick->next != NULL) { /* non-empty list */
- p->ready_list_last_pick = p->ready_list_last_pick->next;
- if (p->ready_list_last_pick == &p->ready_list) {
- /* skip dummy root */
- p->ready_list_last_pick = p->ready_list_last_pick->next;
+ for (size_t i = 0; i < p->num_subchannels; ++i) {
+ const size_t index =
+ (i + p->last_ready_subchannel_index + 1) % p->num_subchannels;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p,
+ (unsigned long)index,
+ p->subchannels[index].curr_connectivity_state);
+ }
+ if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu",
+ p, (unsigned long)index);
+ }
+ return index;
}
- } else { /* should be an empty list */
- GPR_ASSERT(p->ready_list_last_pick == &p->ready_list);
}
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, "
- "CSC %p)",
- (void *)p, (void *)p->ready_list_last_pick,
- (void *)p->ready_list_last_pick->subchannel,
- (void *)grpc_subchannel_get_connected_subchannel(
- p->ready_list_last_pick->subchannel));
+ gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p);
}
+ return p->num_subchannels;
}
-/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
- * csc to the list of ready subchannels. */
-static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- subchannel_data *sd) {
- ready_list *new_elem = gpr_zalloc(sizeof(ready_list));
- new_elem->subchannel = sd->subchannel;
- new_elem->user_data = sd->user_data;
- if (p->ready_list.prev == NULL) {
- /* first element */
- new_elem->next = &p->ready_list;
- new_elem->prev = &p->ready_list;
- p->ready_list.next = new_elem;
- p->ready_list.prev = new_elem;
- } else {
- new_elem->next = &p->ready_list;
- new_elem->prev = p->ready_list.prev;
- p->ready_list.prev->next = new_elem;
- p->ready_list.prev = new_elem;
- }
+// Sets p->last_ready_subchannel_index to last_ready_index.
+static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
+ size_t last_ready_index) {
+ GPR_ASSERT(last_ready_index < p->num_subchannels);
+ p->last_ready_subchannel_index = last_ready_index;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
- (void *)new_elem, (void *)sd->subchannel);
- }
- return new_elem;
-}
-
-/** Removes \a node from the list of connected subchannels */
-static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
- ready_list *node) {
- if (node == NULL) {
- return;
- }
- if (node == p->ready_list_last_pick) {
- p->ready_list_last_pick = p->ready_list_last_pick->prev;
- }
-
- /* removing last item */
- if (node->next == &p->ready_list && node->prev == &p->ready_list) {
- GPR_ASSERT(p->ready_list.next == node);
- GPR_ASSERT(p->ready_list.prev == node);
- p->ready_list.next = NULL;
- p->ready_list.prev = NULL;
- } else {
- node->prev->next = node->next;
- node->next->prev = node->prev;
- }
-
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
- (void *)node->subchannel);
+ gpr_log(GPR_DEBUG,
+ "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
+ (void *)p, (unsigned long)last_ready_index,
+ (void *)p->subchannels[last_ready_index].subchannel,
+ (void *)grpc_subchannel_get_connected_subchannel(
+ p->subchannels[last_ready_index].subchannel));
}
-
- node->next = NULL;
- node->prev = NULL;
- node->subchannel = NULL;
-
- gpr_free(node);
-}
-
-static bool is_ready_list_empty(round_robin_lb_policy *p) {
- return p->ready_list.prev == NULL;
}
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- ready_list *elem;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
}
-
for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
}
- gpr_free(sd);
}
-
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
-
- elem = p->ready_list.next;
- while (elem != NULL && elem != &p->ready_list) {
- ready_list *tmp;
- tmp = elem->next;
- elem->next = NULL;
- elem->prev = NULL;
- elem->subchannel = NULL;
- gpr_free(elem);
- elem = tmp;
- }
-
gpr_free(p);
}
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- size_t i;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
}
-
- p->shutdown = 1;
+ p->shutdown = true;
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
@@ -328,10 +243,13 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
- for (i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
- &sd->connectivity_changed_closure);
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
+ NULL,
+ &sd->connectivity_changed_closure);
+ }
}
}
@@ -339,8 +257,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
@@ -364,8 +281,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
@@ -387,21 +303,16 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
- size_t i;
- p->started_picking = 1;
-
- for (i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- /* use some sentinel value outside of the range of grpc_connectivity_state
- * to signal an undefined previous state. We won't be referring to this
- * value again and it'll be overwritten after the first call to
- * rr_connectivity_changed */
- sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ p->started_picking = true;
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
+ }
}
}
@@ -418,36 +329,32 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- ready_list *selected;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
}
-
- if ((selected = peek_next_connected_locked(p))) {
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->num_subchannels) {
/* readily available, report right away */
+ subchannel_data *sd = &p->subchannels[next_ready_index];
*target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "rr_picked");
-
+ grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked");
if (user_data != NULL) {
- *user_data = selected->user_data;
+ *user_data = sd->user_data;
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
- (void *)*target, (void *)selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)",
+ (void *)*target, (unsigned long)next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
- advance_last_picked_locked(p);
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
return 1;
} else {
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking_locked(exec_ctx, p);
}
- pp = gpr_malloc(sizeof(*pp));
+ pending_pick *pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->target = target;
pp->on_complete = on_complete;
@@ -458,25 +365,31 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
-static void update_state_counters(subchannel_data *sd) {
+static void update_state_counters_locked(subchannel_data *sd) {
round_robin_lb_policy *p = sd->policy;
-
- /* update p->num_transient_failures (resp. p->num_idle): if the previous
- * state was TRANSIENT_FAILURE (resp. IDLE), decrement
- * p->num_transient_failures (resp. p->num_idle). */
- if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(p->num_ready > 0);
+ --p->num_ready;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(p->num_transient_failures > 0);
--p->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(p->num_idle > 0);
--p->num_idle;
}
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ ++p->num_ready;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++p->num_transient_failures;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
+ ++p->num_idle;
+ }
}
/* sd is the subchannel_data associted with the updated subchannel.
* shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
* or SHUTDOWN */
-static grpc_connectivity_state update_lb_connectivity_status(
+static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
@@ -498,7 +411,7 @@ static grpc_connectivity_state update_lb_connectivity_status(
* CHECK: p->num_idle == p->num_subchannels.
*/
round_robin_lb_policy *p = sd->policy;
- if (!is_ready_list_empty(p)) { /* 1) READY */
+ if (p->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY;
@@ -532,32 +445,62 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
- pending_pick *pp;
-
- GRPC_ERROR_REF(error);
-
+ // Now that we're inside the combiner, copy the pending connectivity
+ // state (which was set by the connectivity state watcher) to
+ // curr_connectivity_state, which is what we use inside of the combiner.
+ sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] connectivity changed for subchannel %p: "
+ "prev_state=%d new_state=%d",
+ p, sd->subchannel, sd->prev_connectivity_state,
+ sd->curr_connectivity_state);
+ }
+ // If we're shutting down, unref and return.
if (p->shutdown) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- GRPC_ERROR_UNREF(error);
return;
}
- switch (sd->curr_connectivity_state) {
- case GRPC_CHANNEL_INIT:
- GPR_UNREACHABLE_CODE(return );
- case GRPC_CHANNEL_READY:
- /* add the newly connected subchannel to the list of connected ones.
- * Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd);
+ // Update state counters and determine new overall state.
+ update_state_counters_locked(sd);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ grpc_connectivity_state new_connectivity_state =
+ update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+ // If the new state is SHUTDOWN, unref the subchannel, and if the new
+ // overall state is SHUTDOWN, clean up.
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
+ sd->subchannel = NULL;
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
+ if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ /* the policy is shutting down. Flush all the pending picks... */
+ pending_pick *pp;
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = NULL;
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ gpr_free(pp);
+ }
+ }
+ /* unref the "rr_connectivity" weak ref from start_picking */
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
+ } else {
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
- ready_list *selected = peek_next_connected_locked(p);
- GPR_ASSERT(selected != NULL);
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ GPR_ASSERT(next_ready_index < p->num_subchannels);
+ subchannel_data *selected = &p->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */
- advance_last_picked_locked(p);
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
}
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
@@ -568,72 +511,20 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
- "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- (void *)selected->subchannel, (void *)selected);
+ "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)",
+ (void *)selected->subchannel,
+ (unsigned long)next_ready_index);
}
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_IDLE:
- ++p->num_idle;
- /* fallthrough */
- case GRPC_CHANNEL_CONNECTING:
- update_state_counters(sd);
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- ++p->num_transient_failures;
- /* remove from ready list if still present */
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_SHUTDOWN:
- update_state_counters(sd);
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- --p->num_subchannels;
- GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
- p->subchannels[p->num_subchannels]);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
- p->subchannels[sd->index]->index = sd->index;
- if (update_lb_connectivity_status(exec_ctx, sd, error) ==
- GRPC_CHANNEL_SHUTDOWN) {
- /* the policy is shutting down. Flush all the pending picks... */
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
- }
- }
- gpr_free(sd);
- /* unref the "rr_connectivity" weak ref from start_picking */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- break;
+ }
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
- GRPC_ERROR_UNREF(error);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -654,10 +545,10 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- ready_list *selected;
- grpc_connected_subchannel *target;
- if ((selected = peek_next_connected_locked(p))) {
- target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->num_subchannels) {
+ subchannel_data *selected = &p->subchannels[next_ready_index];
+ grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
@@ -708,7 +599,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
- size_t subchannel_idx = 0;
+ size_t subchannel_index = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
if (addresses->addresses[i].is_balancer) continue;
@@ -727,42 +618,44 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s",
- (void *)subchannel, address_uri);
+ gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s",
+ (unsigned long)subchannel_index, (void *)subchannel, address_uri);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) {
- subchannel_data *sd = gpr_zalloc(sizeof(*sd));
- p->subchannels[subchannel_idx] = sd;
+ subchannel_data *sd = &p->subchannels[subchannel_index];
sd->policy = p;
- sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ /* use some sentinel value outside of the range of grpc_connectivity_state
+ * to signal an undefined previous state. We won't be referring to this
+ * value again and it'll be overwritten after the first call to
+ * rr_connectivity_changed */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) {
sd->user_data =
sd->user_data_vtable->copy(addresses->addresses[i].user_data);
}
- ++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed_locked, sd,
grpc_combiner_scheduler(args->combiner, false));
+ ++subchannel_index;
}
}
- if (subchannel_idx == 0) {
+ if (subchannel_index == 0) {
/* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
}
- p->num_subchannels = subchannel_idx;
+ p->num_subchannels = subchannel_index;
- /* The (dummy node) root of the ready list */
- p->ready_list.subchannel = NULL;
- p->ready_list.prev = NULL;
- p->ready_list.next = NULL;
- p->ready_list_last_pick = &p->ready_list;
+ // Initialize the last pick index to the last subchannel, so that the
+ // first pick will start at the beginning of the list.
+ p->last_ready_subchannel_index = subchannel_index - 1;
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index b9c62c376a..2c076e821c 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -127,6 +127,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
gpr_mu_lock(&state->mu);
if (state->shutdown) {
gpr_mu_unlock(&state->mu);
+ grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_NONE);
grpc_endpoint_destroy(exec_ctx, tcp);
gpr_free(acceptor);
return;
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 0ac2c2ad52..7012ffe568 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -105,7 +105,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
grpc_error *error) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent,
req->context->pollset_set);
- grpc_closure_sched(exec_ctx, req->on_done, GRPC_ERROR_REF(error));
+ grpc_closure_sched(exec_ctx, req->on_done, error);
grpc_http_parser_destroy(&req->parser);
if (req->addresses != NULL) {
grpc_resolved_addresses_destroy(req->addresses);
@@ -244,7 +244,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
internal_request *req = arg;
if (error != GRPC_ERROR_NONE) {
- finish(exec_ctx, req, error);
+ finish(exec_ctx, req, GRPC_ERROR_REF(error));
return;
}
req->next_address = 0;
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 76946434f0..ea7c1122c1 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -44,6 +44,7 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/string.h"
#include "src/core/tsi/ssl_transport_security.h"
+#include "src/core/tsi/transport_security_adapter.h"
typedef struct {
grpc_channel_security_connector base;
@@ -78,7 +79,8 @@ static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx,
}
grpc_handshake_manager_add(
handshake_mgr,
- grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base));
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_adapter_handshaker(handshaker), &sc->base));
}
static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index 30431a4e4a..416a3bdb35 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -56,6 +56,7 @@
#include "src/core/lib/support/string.h"
#include "src/core/tsi/fake_transport_security.h"
#include "src/core/tsi/ssl_transport_security.h"
+#include "src/core/tsi/transport_security_adapter.h"
/* -- Constants. -- */
@@ -390,7 +391,8 @@ static void fake_channel_add_handshakers(
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(
- exec_ctx, tsi_create_fake_handshaker(true /* is_client */),
+ exec_ctx, tsi_create_adapter_handshaker(
+ tsi_create_fake_handshaker(true /* is_client */)),
&sc->base));
}
@@ -400,7 +402,8 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(
- exec_ctx, tsi_create_fake_handshaker(false /* is_client */),
+ exec_ctx, tsi_create_adapter_handshaker(
+ tsi_create_fake_handshaker(false /* is_client */)),
&sc->base));
}
@@ -495,8 +498,10 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
}
// Create handshakers.
- grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
- exec_ctx, tsi_hs, &sc->base));
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base));
}
static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
@@ -515,8 +520,10 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
}
// Create handshakers.
- grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
- exec_ctx, tsi_hs, &sc->base));
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base));
}
static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) {
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index 509b4b556d..3bc113e20f 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -71,12 +71,12 @@ typedef struct {
unsigned char *handshake_buffer;
size_t handshake_buffer_size;
- grpc_slice_buffer left_overs;
grpc_slice_buffer outgoing;
grpc_closure on_handshake_data_sent_to_peer;
grpc_closure on_handshake_data_received_from_peer;
grpc_closure on_peer_checked;
grpc_auth_context *auth_context;
+ tsi_handshaker_result *handshaker_result;
} security_handshaker;
static void security_handshaker_unref(grpc_exec_ctx *exec_ctx,
@@ -84,6 +84,7 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&h->refs)) {
gpr_mu_destroy(&h->mu);
tsi_handshaker_destroy(h->handshaker);
+ tsi_handshaker_result_destroy(h->handshaker_result);
if (h->endpoint_to_destroy != NULL) {
grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy);
}
@@ -92,7 +93,6 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx,
gpr_free(h->read_buffer_to_destroy);
}
gpr_free(h->handshake_buffer);
- grpc_slice_buffer_destroy_internal(exec_ctx, &h->left_overs);
grpc_slice_buffer_destroy_internal(exec_ctx, &h->outgoing);
GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake");
GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, h->connector, "handshake");
@@ -150,10 +150,10 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error));
goto done;
}
- // Get frame protector.
+ // Create frame protector.
tsi_frame_protector *protector;
- tsi_result result =
- tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector);
+ tsi_result result = tsi_handshaker_result_create_frame_protector(
+ h->handshaker_result, NULL, &protector);
if (result != TSI_OK) {
error = grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"),
@@ -161,14 +161,25 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
security_handshake_failed_locked(exec_ctx, h, error);
goto done;
}
- // Success.
+ // Get unused bytes.
+ unsigned char *unused_bytes = NULL;
+ size_t unused_bytes_size = 0;
+ result = tsi_handshaker_result_get_unused_bytes(
+ h->handshaker_result, &unused_bytes, &unused_bytes_size);
// Create secure endpoint.
- h->args->endpoint = grpc_secure_endpoint_create(
- protector, h->args->endpoint, h->left_overs.slices, h->left_overs.count);
- h->left_overs.count = 0;
- h->left_overs.length = 0;
- // Clear out the read buffer before it gets passed to the transport,
- // since any excess bytes were already copied to h->left_overs.
+ if (unused_bytes_size > 0) {
+ grpc_slice slice =
+ grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size);
+ h->args->endpoint =
+ grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1);
+ grpc_slice_unref_internal(exec_ctx, slice);
+ } else {
+ h->args->endpoint =
+ grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0);
+ }
+ tsi_handshaker_result_destroy(h->handshaker_result);
+ h->handshaker_result = NULL;
+ // Clear out the read buffer before it gets passed to the transport.
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, h->args->read_buffer);
// Add auth context to channel args.
grpc_arg auth_context_arg = grpc_auth_context_to_arg(h->auth_context);
@@ -189,7 +200,8 @@ done:
static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx,
security_handshaker *h) {
tsi_peer peer;
- tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer);
+ tsi_result result =
+ tsi_handshaker_result_extract_peer(h->handshaker_result, &peer);
if (result != TSI_OK) {
return grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result);
@@ -199,34 +211,87 @@ static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx,
- security_handshaker *h) {
- // Get data to send.
- tsi_result result = TSI_OK;
- size_t offset = 0;
- do {
- size_t to_send_size = h->handshake_buffer_size - offset;
- result = tsi_handshaker_get_bytes_to_send_to_peer(
- h->handshaker, h->handshake_buffer + offset, &to_send_size);
- offset += to_send_size;
- if (result == TSI_INCOMPLETE_DATA) {
- h->handshake_buffer_size *= 2;
- h->handshake_buffer =
- gpr_realloc(h->handshake_buffer, h->handshake_buffer_size);
- }
- } while (result == TSI_INCOMPLETE_DATA);
+static grpc_error *on_handshake_next_done_locked(
+ grpc_exec_ctx *exec_ctx, security_handshaker *h, tsi_result result,
+ const unsigned char *bytes_to_send, size_t bytes_to_send_size,
+ tsi_handshaker_result *handshaker_result) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ // Read more if we need to.
+ if (result == TSI_INCOMPLETE_DATA) {
+ GPR_ASSERT(bytes_to_send_size == 0);
+ grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
+ &h->on_handshake_data_received_from_peer);
+ return error;
+ }
if (result != TSI_OK) {
return grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result);
}
- // Send data.
- grpc_slice to_send =
- grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset);
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing);
- grpc_slice_buffer_add(&h->outgoing, to_send);
- grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing,
- &h->on_handshake_data_sent_to_peer);
- return GRPC_ERROR_NONE;
+ // Update handshaker result.
+ if (handshaker_result != NULL) {
+ GPR_ASSERT(h->handshaker_result == NULL);
+ h->handshaker_result = handshaker_result;
+ }
+ if (bytes_to_send_size > 0) {
+ // Send data to peer, if needed.
+ grpc_slice to_send = grpc_slice_from_copied_buffer(
+ (const char *)bytes_to_send, bytes_to_send_size);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing);
+ grpc_slice_buffer_add(&h->outgoing, to_send);
+ grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing,
+ &h->on_handshake_data_sent_to_peer);
+ } else if (handshaker_result == NULL) {
+ // There is nothing to send, but need to read from peer.
+ grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
+ &h->on_handshake_data_received_from_peer);
+ } else {
+ // Handshake has finished, check peer and so on.
+ error = check_peer_locked(exec_ctx, h);
+ }
+ return error;
+}
+
+static void on_handshake_next_done_grpc_wrapper(
+ tsi_result result, void *user_data, const unsigned char *bytes_to_send,
+ size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) {
+ security_handshaker *h = user_data;
+ // This callback will be invoked by TSI in a non-grpc thread, so it's
+ // safe to create our own exec_ctx here.
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_lock(&h->mu);
+ grpc_error *error =
+ on_handshake_next_done_locked(&exec_ctx, h, result, bytes_to_send,
+ bytes_to_send_size, handshaker_result);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(&exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(&exec_ctx, h);
+ } else {
+ gpr_mu_unlock(&h->mu);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static grpc_error *do_handshaker_next_locked(
+ grpc_exec_ctx *exec_ctx, security_handshaker *h,
+ const unsigned char *bytes_received, size_t bytes_received_size) {
+ // Invoke TSI handshaker.
+ unsigned char *bytes_to_send = NULL;
+ size_t bytes_to_send_size = 0;
+ tsi_handshaker_result *handshaker_result = NULL;
+ tsi_result result = tsi_handshaker_next(
+ h->handshaker, bytes_received, bytes_received_size, &bytes_to_send,
+ &bytes_to_send_size, &handshaker_result,
+ &on_handshake_next_done_grpc_wrapper, h);
+ if (result == TSI_ASYNC) {
+ // Handshaker operating asynchronously. Nothing else to do here;
+ // callback will be invoked in a TSI thread.
+ return GRPC_ERROR_NONE;
+ }
+ // Handshaker returned synchronously. Invoke callback directly in
+ // this thread with our existing exec_ctx.
+ return on_handshake_next_done_locked(exec_ctx, h, result, bytes_to_send,
+ bytes_to_send_size, handshaker_result);
}
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
@@ -241,72 +306,34 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
security_handshaker_unref(exec_ctx, h);
return;
}
- // Process received data.
- tsi_result result = TSI_OK;
- size_t consumed_slice_size = 0;
+ // Copy all slices received.
size_t i;
+ size_t bytes_received_size = 0;
for (i = 0; i < h->args->read_buffer->count; i++) {
- consumed_slice_size = GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]);
- result = tsi_handshaker_process_bytes_from_peer(
- h->handshaker, GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]),
- &consumed_slice_size);
- if (!tsi_handshaker_is_in_progress(h->handshaker)) break;
+ bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]);
}
- if (tsi_handshaker_is_in_progress(h->handshaker)) {
- /* We may need more data. */
- if (result == TSI_INCOMPLETE_DATA) {
- grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
- &h->on_handshake_data_received_from_peer);
- goto done;
- } else {
- error = send_handshake_bytes_to_peer_locked(exec_ctx, h);
- if (error != GRPC_ERROR_NONE) {
- security_handshake_failed_locked(exec_ctx, h, error);
- gpr_mu_unlock(&h->mu);
- security_handshaker_unref(exec_ctx, h);
- return;
- }
- goto done;
- }
+ if (bytes_received_size > h->handshake_buffer_size) {
+ h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size);
+ h->handshake_buffer_size = bytes_received_size;
}
- if (result != TSI_OK) {
- security_handshake_failed_locked(
- exec_ctx, h,
- grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result));
- gpr_mu_unlock(&h->mu);
- security_handshaker_unref(exec_ctx, h);
- return;
- }
- /* Handshake is done and successful this point. */
- bool has_left_overs_in_current_slice =
- (consumed_slice_size <
- GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]));
- size_t num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) +
- h->args->read_buffer->count - i - 1;
- if (num_left_overs > 0) {
- /* Put the leftovers in our buffer (ownership transfered). */
- if (has_left_overs_in_current_slice) {
- grpc_slice tail = grpc_slice_split_tail(&h->args->read_buffer->slices[i],
- consumed_slice_size);
- grpc_slice_buffer_add(&h->left_overs, tail);
- /* split_tail above increments refcount. */
- grpc_slice_unref_internal(exec_ctx, tail);
- }
- grpc_slice_buffer_addn(
- &h->left_overs, &h->args->read_buffer->slices[i + 1],
- num_left_overs - (size_t)has_left_overs_in_current_slice);
+ size_t offset = 0;
+ for (i = 0; i < h->args->read_buffer->count; i++) {
+ size_t slice_size = GPR_SLICE_LENGTH(h->args->read_buffer->slices[i]);
+ memcpy(h->handshake_buffer + offset,
+ GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), slice_size);
+ offset += slice_size;
}
- // Check peer.
- error = check_peer_locked(exec_ctx, h);
+ // Call TSI handshaker.
+ error = do_handshaker_next_locked(exec_ctx, h, h->handshake_buffer,
+ bytes_received_size);
+
if (error != GRPC_ERROR_NONE) {
security_handshake_failed_locked(exec_ctx, h, error);
gpr_mu_unlock(&h->mu);
security_handshaker_unref(exec_ctx, h);
- return;
+ } else {
+ gpr_mu_unlock(&h->mu);
}
-done:
- gpr_mu_unlock(&h->mu);
}
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg,
@@ -321,8 +348,8 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg,
security_handshaker_unref(exec_ctx, h);
return;
}
- /* We may be done. */
- if (tsi_handshaker_is_in_progress(h->handshaker)) {
+ // We may be done.
+ if (h->handshaker_result == NULL) {
grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
&h->on_handshake_data_received_from_peer);
} else {
@@ -371,7 +398,7 @@ static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
h->args = args;
h->on_handshake_done = on_handshake_done;
gpr_ref(&h->refs);
- grpc_error *error = send_handshake_bytes_to_peer_locked(exec_ctx, h);
+ grpc_error *error = do_handshaker_next_locked(exec_ctx, h, NULL, 0);
if (error != GRPC_ERROR_NONE) {
security_handshake_failed_locked(exec_ctx, h, error);
gpr_mu_unlock(&h->mu);
@@ -404,7 +431,6 @@ static grpc_handshaker *security_handshaker_create(
grpc_schedule_on_exec_ctx);
grpc_closure_init(&h->on_peer_checked, on_peer_checked, h,
grpc_schedule_on_exec_ctx);
- grpc_slice_buffer_init(&h->left_overs);
grpc_slice_buffer_init(&h->outgoing);
return &h->base;
}