diff options
author | Yuchen Zeng <zyc@google.com> | 2017-06-06 13:26:36 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2017-06-06 13:26:36 -0700 |
commit | b89b5011ecfe5135e03e98fb058e0ad345747007 (patch) | |
tree | c49d57c3c752150148840b672474ccfeb81d6294 /src/core | |
parent | 4ebace71b071d9dd38a089c88b737b52fe57dfd5 (diff) | |
parent | b1332047d049d788a59284a561f4d6c2c2488792 (diff) |
Merge remote-tracking branch 'upstream/master' into srv_record
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/census/intrusive_hash_map.c | 320 | ||||
-rw-r--r-- | src/core/ext/census/intrusive_hash_map.h | 167 | ||||
-rw-r--r-- | src/core/ext/census/intrusive_hash_map_internal.h | 63 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/channel_connectivity.c | 77 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.c | 115 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.h | 6 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c | 459 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/server/chttp2_server.c | 1 | ||||
-rw-r--r-- | src/core/lib/http/httpcli.c | 4 | ||||
-rw-r--r-- | src/core/lib/http/httpcli_security_connector.c | 4 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_connector.c | 19 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_handshaker.c | 220 |
12 files changed, 1034 insertions, 421 deletions
diff --git a/src/core/ext/census/intrusive_hash_map.c b/src/core/ext/census/intrusive_hash_map.c new file mode 100644 index 0000000000..9f56b765e1 --- /dev/null +++ b/src/core/ext/census/intrusive_hash_map.c @@ -0,0 +1,320 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/census/intrusive_hash_map.h" +#include <string.h> + +extern bool hm_index_compare(const hm_index *A, const hm_index *B); + +/* Simple hashing function that takes lower 32 bits. */ +static __inline uint32_t chunked_vector_hasher(uint64_t key) { + return (uint32_t)key; +} + +/* Vector chunks are 1MiB divided by pointer size. */ +static const size_t VECTOR_CHUNK_SIZE = (1 << 20) / sizeof(void *); + +/* Helper functions which return buckets from the chunked vector. */ +static __inline void **get_mutable_bucket(const chunked_vector *buckets, + uint32_t index) { + if (index < VECTOR_CHUNK_SIZE) { + return &buckets->first_[index]; + } + size_t rest_index = (index - VECTOR_CHUNK_SIZE) / VECTOR_CHUNK_SIZE; + return &buckets->rest_[rest_index][index % VECTOR_CHUNK_SIZE]; +} + +static __inline void *get_bucket(const chunked_vector *buckets, + uint32_t index) { + if (index < VECTOR_CHUNK_SIZE) { + return buckets->first_[index]; + } + size_t rest_index = (index - VECTOR_CHUNK_SIZE) / VECTOR_CHUNK_SIZE; + return buckets->rest_[rest_index][index % VECTOR_CHUNK_SIZE]; +} + +/* Helper function. */ +static __inline size_t RestSize(const chunked_vector *vec) { + return (vec->size_ <= VECTOR_CHUNK_SIZE) + ? 0 + : (vec->size_ - VECTOR_CHUNK_SIZE - 1) / VECTOR_CHUNK_SIZE + 1; +} + +/* Initialize chunked vector to size of 0. */ +static void chunked_vector_init(chunked_vector *vec) { + vec->size_ = 0; + vec->first_ = NULL; + vec->rest_ = NULL; +} + +/* Clear chunked vector and free all memory that has been allocated then + initialize chunked vector. */ +static void chunked_vector_clear(chunked_vector *vec) { + if (vec->first_ != NULL) { + gpr_free(vec->first_); + } + if (vec->rest_ != NULL) { + size_t rest_size = RestSize(vec); + for (size_t i = 0; i < rest_size; ++i) { + if (vec->rest_[i] != NULL) { + gpr_free(vec->rest_[i]); + } + } + gpr_free(vec->rest_); + } + chunked_vector_init(vec); +} + +/* Clear chunked vector and then resize it to n entries. Allow the first 1MB to + be read w/o an extra cache miss. The rest of the elements are stored in an + array of arrays to avoid large mallocs. */ +static void chunked_vector_reset(chunked_vector *vec, size_t n) { + chunked_vector_clear(vec); + vec->size_ = n; + if (n <= VECTOR_CHUNK_SIZE) { + vec->first_ = (void **)gpr_malloc(sizeof(void *) * n); + memset(vec->first_, 0, sizeof(void *) * n); + } else { + vec->first_ = (void **)gpr_malloc(sizeof(void *) * VECTOR_CHUNK_SIZE); + memset(vec->first_, 0, sizeof(void *) * VECTOR_CHUNK_SIZE); + size_t rest_size = RestSize(vec); + vec->rest_ = (void ***)gpr_malloc(sizeof(void **) * rest_size); + memset(vec->rest_, 0, sizeof(void **) * rest_size); + int i = 0; + n -= VECTOR_CHUNK_SIZE; + while (n > 0) { + size_t this_size = GPR_MIN(n, VECTOR_CHUNK_SIZE); + vec->rest_[i] = (void **)gpr_malloc(sizeof(void *) * this_size); + memset(vec->rest_[i], 0, sizeof(void *) * this_size); + n -= this_size; + ++i; + } + } +} + +void intrusive_hash_map_init(intrusive_hash_map *hash_map, + uint32_t initial_log2_table_size) { + hash_map->log2_num_buckets = initial_log2_table_size; + hash_map->num_items = 0; + uint32_t num_buckets = (uint32_t)1 << hash_map->log2_num_buckets; + hash_map->extend_threshold = num_buckets >> 1; + chunked_vector_init(&hash_map->buckets); + chunked_vector_reset(&hash_map->buckets, num_buckets); + hash_map->hash_mask = num_buckets - 1; +} + +bool intrusive_hash_map_empty(const intrusive_hash_map *hash_map) { + return hash_map->num_items == 0; +} + +size_t intrusive_hash_map_size(const intrusive_hash_map *hash_map) { + return hash_map->num_items; +} + +void intrusive_hash_map_end(const intrusive_hash_map *hash_map, hm_index *idx) { + idx->bucket_index = (uint32_t)hash_map->buckets.size_; + GPR_ASSERT(idx->bucket_index <= UINT32_MAX); + idx->item = NULL; +} + +void intrusive_hash_map_next(const intrusive_hash_map *hash_map, + hm_index *idx) { + idx->item = idx->item->hash_link; + while (idx->item == NULL) { + idx->bucket_index++; + if (idx->bucket_index >= hash_map->buckets.size_) { + /* Reached end of table. */ + idx->item = NULL; + return; + } + idx->item = (hm_item *)get_bucket(&hash_map->buckets, idx->bucket_index); + } +} + +void intrusive_hash_map_begin(const intrusive_hash_map *hash_map, + hm_index *idx) { + for (uint32_t i = 0; i < hash_map->buckets.size_; ++i) { + if (get_bucket(&hash_map->buckets, i) != NULL) { + idx->bucket_index = i; + idx->item = (hm_item *)get_bucket(&hash_map->buckets, i); + return; + } + } + intrusive_hash_map_end(hash_map, idx); +} + +hm_item *intrusive_hash_map_find(const intrusive_hash_map *hash_map, + uint64_t key) { + uint32_t index = chunked_vector_hasher(key) & hash_map->hash_mask; + + hm_item *p = (hm_item *)get_bucket(&hash_map->buckets, index); + while (p != NULL) { + if (key == p->key) { + return p; + } + p = p->hash_link; + } + return NULL; +} + +hm_item *intrusive_hash_map_erase(intrusive_hash_map *hash_map, uint64_t key) { + uint32_t index = chunked_vector_hasher(key) & hash_map->hash_mask; + + hm_item **slot = (hm_item **)get_mutable_bucket(&hash_map->buckets, index); + hm_item *p = *slot; + if (p == NULL) { + return NULL; + } + + if (key == p->key) { + *slot = p->hash_link; + p->hash_link = NULL; + hash_map->num_items--; + return p; + } + + hm_item *prev = p; + p = p->hash_link; + + while (p) { + if (key == p->key) { + prev->hash_link = p->hash_link; + p->hash_link = NULL; + hash_map->num_items--; + return p; + } + prev = p; + p = p->hash_link; + } + return NULL; +} + +/* Insert an hm_item* into the underlying chunked vector. hash_mask is + * array_size-1. Returns true if it is a new hm_item and false if the hm_item + * already existed. + */ +static __inline bool intrusive_hash_map_internal_insert(chunked_vector *buckets, + uint32_t hash_mask, + hm_item *item) { + const uint64_t key = item->key; + uint32_t index = chunked_vector_hasher(key) & hash_mask; + hm_item **slot = (hm_item **)get_mutable_bucket(buckets, index); + hm_item *p = *slot; + item->hash_link = p; + + /* Check to see if key already exists. */ + while (p) { + if (p->key == key) { + return false; + } + p = p->hash_link; + } + + /* Otherwise add new entry. */ + *slot = item; + return true; +} + +/* Extend the allocated number of elements in the hash map by a factor of 2. */ +void intrusive_hash_map_extend(intrusive_hash_map *hash_map) { + uint32_t new_log2_num_buckets = 1 + hash_map->log2_num_buckets; + uint32_t new_num_buckets = (uint32_t)1 << new_log2_num_buckets; + GPR_ASSERT(new_num_buckets <= UINT32_MAX && new_num_buckets > 0); + chunked_vector new_buckets; + chunked_vector_init(&new_buckets); + chunked_vector_reset(&new_buckets, new_num_buckets); + uint32_t new_hash_mask = new_num_buckets - 1; + + hm_index cur_idx; + hm_index end_idx; + intrusive_hash_map_end(hash_map, &end_idx); + intrusive_hash_map_begin(hash_map, &cur_idx); + while (!hm_index_compare(&cur_idx, &end_idx)) { + hm_item *new_item = cur_idx.item; + intrusive_hash_map_next(hash_map, &cur_idx); + intrusive_hash_map_internal_insert(&new_buckets, new_hash_mask, new_item); + } + + /* Set values for new chunked_vector. extend_threshold is set to half of + * new_num_buckets. */ + hash_map->log2_num_buckets = new_log2_num_buckets; + chunked_vector_clear(&hash_map->buckets); + hash_map->buckets = new_buckets; + hash_map->hash_mask = new_hash_mask; + hash_map->extend_threshold = new_num_buckets >> 1; +} + +/* Insert a hm_item. The hm_item must remain live until it is removed from the + table. This object does not take the ownership of hm_item. The caller must + remove this hm_item from the table and delete it before this table is + deleted. If hm_item exists already num_items is not changed. */ +bool intrusive_hash_map_insert(intrusive_hash_map *hash_map, hm_item *item) { + if (hash_map->num_items >= hash_map->extend_threshold) { + intrusive_hash_map_extend(hash_map); + } + if (intrusive_hash_map_internal_insert(&hash_map->buckets, + hash_map->hash_mask, item)) { + hash_map->num_items++; + return true; + } + return false; +} + +void intrusive_hash_map_clear(intrusive_hash_map *hash_map, + void (*free_object)(void *)) { + hm_index cur; + hm_index end; + intrusive_hash_map_end(hash_map, &end); + intrusive_hash_map_begin(hash_map, &cur); + + while (!hm_index_compare(&cur, &end)) { + hm_index next = cur; + intrusive_hash_map_next(hash_map, &next); + if (cur.item != NULL) { + hm_item *item = intrusive_hash_map_erase(hash_map, cur.item->key); + (*free_object)((void *)item); + gpr_free(item); + } + cur = next; + } +} + +void intrusive_hash_map_free(intrusive_hash_map *hash_map, + void (*free_object)(void *)) { + intrusive_hash_map_clear(hash_map, (*free_object)); + hash_map->num_items = 0; + hash_map->extend_threshold = 0; + hash_map->log2_num_buckets = 0; + hash_map->hash_mask = 0; + chunked_vector_clear(&hash_map->buckets); +} diff --git a/src/core/ext/census/intrusive_hash_map.h b/src/core/ext/census/intrusive_hash_map.h new file mode 100644 index 0000000000..e316bf4b16 --- /dev/null +++ b/src/core/ext/census/intrusive_hash_map.h @@ -0,0 +1,167 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H +#define GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H + +#include "src/core/ext/census/intrusive_hash_map_internal.h" + +/* intrusive_hash_map is a fast chained hash table. This hash map is faster than + * a dense hash map when the application calls insert and erase more often than + * find. When the workload is dominated by find() a dense hash map may be + * faster. + * + * intrusive_hash_map uses an intrusive header placed within a user defined + * struct. The header field IHM_key MUST be set to a valid value before + * insertion into the hash map or undefined behavior may occur. The header field + * IHM_hash_link MUST to be set to NULL initially. + * + * EXAMPLE USAGE: + * + * typedef struct string_item { + * INTRUSIVE_HASH_MAP_HEADER; + * // User data. + * char *str_buf; + * uint16_t len; + * } string_item; + * + * static string_item *make_string_item(uint64_t key, const char *buf, + * uint16_t len) { + * string_item *item = (string_item *)gpr_malloc(sizeof(string_item)); + * item->IHM_key = key; + * item->IHM_hash_link = NULL; + * item->len = len; + * item->str_buf = (char *)malloc(len); + * memcpy(item->str_buf, buf, len); + * return item; + * } + * + * intrusive_hash_map hash_map; + * intrusive_hash_map_init(&hash_map, 4); + * string_item *new_item1 = make_string_item(10, "test1", 5); + * bool ok = intrusive_hash_map_insert(&hash_map, (hm_item *)new_item1); + * + * string_item *item1 = + * (string_item *)intrusive_hash_map_find(&hash_map, 10); + */ + +/* Hash map item. Stores key and a pointer to the actual object. A user defined + * version of this can be passed in provided the first 2 entries (key and + * hash_link) are the same. These entries must be first in the user defined + * struct. Pointer to struct will need to be cast as (hm_item *) when passed to + * hash map. This allows it to be intrusive. */ +typedef struct hm_item { + uint64_t key; + struct hm_item *hash_link; + /* Optional user defined data after this. */ +} hm_item; + +/* Macro provided for ease of use. This must be first in the user defined + * struct (i.e. uint64_t key and hm_item * must be the first two elements in + * that order). */ +#define INTRUSIVE_HASH_MAP_HEADER \ + uint64_t IHM_key; \ + struct hm_item *IHM_hash_link + +/* Index struct which acts as a pseudo-iterator within the hash map. */ +typedef struct hm_index { + uint32_t bucket_index; // hash map bucket index. + hm_item *item; // Pointer to hm_item within the hash map. +} hm_index; + +/* Returns true if two hm_indices point to the same object within the hash map + * and false otherwise. */ +__inline bool hm_index_compare(const hm_index *A, const hm_index *B) { + return (A->item == B->item && A->bucket_index == B->bucket_index); +} + +/* + * Helper functions for iterating over the hash map. + */ + +/* On return idx will contain an invalid index which is always equal to + * hash_map->buckets.size_ */ +void intrusive_hash_map_end(const intrusive_hash_map *hash_map, hm_index *idx); + +/* Iterates index to the next valid entry in the hash map and stores the + * index within idx. If end of table is reached, idx will contain the same + * values as if intrusive_hash_map_end() was called. */ +void intrusive_hash_map_next(const intrusive_hash_map *hash_map, hm_index *idx); + +/* On return, idx will contain the index of the first non-null entry in the hash + * map. If the hash map is empty, idx will contain the same values as if + * intrusive_hash_map_end() was called. */ +void intrusive_hash_map_begin(const intrusive_hash_map *hash_map, + hm_index *idx); + +/* Initialize intrusive hash map data structure. This must be called before + * the hash map can be used. The initial size of an intrusive hash map will be + * 2^initial_log2_map_size (valid range is [0, 31]). */ +void intrusive_hash_map_init(intrusive_hash_map *hash_map, + uint32_t initial_log2_map_size); + +/* Returns true if the hash map is empty and false otherwise. */ +bool intrusive_hash_map_empty(const intrusive_hash_map *hash_map); + +/* Returns the number of elements currently in the hash map. */ +size_t intrusive_hash_map_size(const intrusive_hash_map *hash_map); + +/* Find a hm_item within the hash map by key. Returns NULL if item was not + * found. */ +hm_item *intrusive_hash_map_find(const intrusive_hash_map *hash_map, + uint64_t key); + +/* Erase the hm_item that corresponds with key. If the hm_item is found, return + * the pointer to the hm_item. Else returns NULL. */ +hm_item *intrusive_hash_map_erase(intrusive_hash_map *hash_map, uint64_t key); + +/* Attempts to insert a new hm_item into the hash map. If an element with the + * same key already exists, it will not insert the new item and return false. + * Otherwise, it will insert the new item and return true. */ +bool intrusive_hash_map_insert(intrusive_hash_map *hash_map, hm_item *item); + +/* Clears entire contents of the hash map, but leaves internal data structure + * untouched. Second argument takes a function pointer to a function that will + * free the object designated by the user and pointed to by hash_map->value. */ +void intrusive_hash_map_clear(intrusive_hash_map *hash_map, + void (*free_object)(void *)); + +/* Erase all contents of hash map and free the memory. Hash map is invalid + * after calling this function and cannot be used until it has been + * reinitialized (intrusive_hash_map_init()). This function takes a function + * pointer to a function that will free the object designated by the user and + * pointed to by hash_map->value. */ +void intrusive_hash_map_free(intrusive_hash_map *hash_map, + void (*free_object)(void *)); + +#endif /* GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H */ diff --git a/src/core/ext/census/intrusive_hash_map_internal.h b/src/core/ext/census/intrusive_hash_map_internal.h new file mode 100644 index 0000000000..76a9a3a722 --- /dev/null +++ b/src/core/ext/census/intrusive_hash_map_internal.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H +#define GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> +#include <stdbool.h> + +/* The chunked vector is a data structure that allocates buckets for use in the + * hash map. ChunkedVector is logically equivalent to T*[N] (cast void* as + * T*). It's internally implemented as an array of 1MB arrays to avoid + * allocating large consecutive memory chunks. This is an internal data + * structure that should never be accessed directly. */ +typedef struct chunked_vector { + size_t size_; + void **first_; + void ***rest_; +} chunked_vector; + +/* Core intrusive hash map data structure. All internal elements are managed by + * functions and should not be altered manually. */ +typedef struct intrusive_hash_map { + uint32_t num_items; + uint32_t extend_threshold; + uint32_t log2_num_buckets; + uint32_t hash_mask; + chunked_vector buckets; +} intrusive_hash_map; + +#endif /* GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H */ diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index f83670db82..04666edbec 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -67,9 +67,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( typedef enum { WAITING, - CALLING_BACK, + READY_TO_CALL_BACK, CALLING_BACK_AND_FINISHED, - CALLED_BACK } callback_phase; typedef struct { @@ -77,11 +76,13 @@ typedef struct { callback_phase phase; grpc_closure on_complete; grpc_closure on_timeout; + grpc_closure watcher_timer_init; grpc_timer alarm; grpc_connectivity_state state; grpc_completion_queue *cq; grpc_cq_completion completion_storage; grpc_channel *channel; + grpc_error *error; void *tag; } state_watcher; @@ -105,11 +106,8 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, gpr_mu_lock(&w->mu); switch (w->phase) { case WAITING: - case CALLED_BACK: + case READY_TO_CALL_BACK: GPR_UNREACHABLE_CODE(return ); - case CALLING_BACK: - w->phase = CALLED_BACK; - break; case CALLING_BACK_AND_FINISHED: delete = 1; break; @@ -123,10 +121,14 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, bool due_to_completion, grpc_error *error) { - int delete = 0; - if (due_to_completion) { grpc_timer_cancel(exec_ctx, &w->alarm); + } else { + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq), NULL, + &w->on_complete, NULL); } gpr_mu_lock(&w->mu); @@ -147,25 +149,27 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, } switch (w->phase) { case WAITING: - w->phase = CALLING_BACK; - grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error), - finished_completion, w, &w->completion_storage); + GRPC_ERROR_REF(error); + w->error = error; + w->phase = READY_TO_CALL_BACK; break; - case CALLING_BACK: + case READY_TO_CALL_BACK: + if (error != GRPC_ERROR_NONE) { + GPR_ASSERT(!due_to_completion); + GRPC_ERROR_UNREF(w->error); + GRPC_ERROR_REF(error); + w->error = error; + } w->phase = CALLING_BACK_AND_FINISHED; + grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w, + &w->completion_storage); break; case CALLING_BACK_AND_FINISHED: GPR_UNREACHABLE_CODE(return ); - case CALLED_BACK: - delete = 1; break; } gpr_mu_unlock(&w->mu); - if (delete) { - delete_state_watcher(exec_ctx, w); - } - GRPC_ERROR_UNREF(error); } @@ -179,6 +183,28 @@ static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error)); } +int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + return grpc_client_channel_num_external_connectivity_watchers( + client_channel_elem); +} + +typedef struct watcher_timer_init_arg { + state_watcher *w; + gpr_timespec deadline; +} watcher_timer_init_arg; + +static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg; + + grpc_timer_init(exec_ctx, &wa->w->alarm, + gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC), + &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_free(wa); +} + void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { @@ -208,16 +234,19 @@ void grpc_channel_watch_connectivity_state( w->cq = cq; w->tag = tag; w->channel = channel; + w->error = NULL; - grpc_timer_init(&exec_ctx, &w->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + watcher_timer_init_arg *wa = gpr_malloc(sizeof(watcher_timer_init_arg)); + wa->w = w; + wa->deadline = deadline; + grpc_closure_init(&w->watcher_timer_init, watcher_timer_init, wa, + grpc_schedule_on_exec_ctx); if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); - grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, - grpc_cq_pollset(cq), &w->state, - &w->on_complete); + grpc_client_channel_watch_connectivity_state( + &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, + &w->on_complete, &w->watcher_timer_init); } else { abort(); } diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index f2f27b9175..8cebbe9eca 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -167,6 +167,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) { return value; } +struct external_connectivity_watcher; + /************************************************************************* * CHANNEL-WIDE FUNCTIONS */ @@ -204,6 +206,11 @@ typedef struct client_channel_channel_data { /** interested parties (owned) */ grpc_pollset_set *interested_parties; + /* external_connectivity_watcher_list head is guarded by its own mutex, since + * counts need to be grabbed immediately without polling on a cq */ + gpr_mu external_connectivity_watcher_list_mu; + struct external_connectivity_watcher *external_connectivity_watcher_list_head; + /* the following properties are guarded by a mutex since API's require them to be instantaneously available */ gpr_mu info_mu; @@ -661,6 +668,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, // Initialize data members. chand->combiner = grpc_combiner_create(NULL); gpr_mu_init(&chand->info_mu); + gpr_mu_init(&chand->external_connectivity_watcher_list_mu); + + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + chand->external_connectivity_watcher_list_head = NULL; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, @@ -749,6 +762,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); + gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); } /************************************************************************* @@ -1431,14 +1445,79 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( return out; } -typedef struct { +typedef struct external_connectivity_watcher { channel_data *chand; grpc_pollset *pollset; grpc_closure *on_complete; + grpc_closure *watcher_timer_init; grpc_connectivity_state *state; grpc_closure my_closure; + struct external_connectivity_watcher *next; } external_connectivity_watcher; +static external_connectivity_watcher *lookup_external_connectivity_watcher( + channel_data *chand, grpc_closure *on_complete) { + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL && w->on_complete != on_complete) { + w = w->next; + } + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return w; +} + +static void external_connectivity_watcher_list_append( + channel_data *chand, external_connectivity_watcher *w) { + GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); + + gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); + GPR_ASSERT(!w->next); + w->next = chand->external_connectivity_watcher_list_head; + chand->external_connectivity_watcher_list_head = w; + gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); +} + +static void external_connectivity_watcher_list_remove( + channel_data *chand, external_connectivity_watcher *too_remove) { + GPR_ASSERT( + lookup_external_connectivity_watcher(chand, too_remove->on_complete)); + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + if (too_remove == chand->external_connectivity_watcher_list_head) { + chand->external_connectivity_watcher_list_head = too_remove->next; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return; + } + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL) { + if (w->next == too_remove) { + w->next = w->next->next; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return; + } + w = w->next; + } + GPR_UNREACHABLE_CODE(return ); +} + +int grpc_client_channel_num_external_connectivity_watchers( + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + int count = 0; + + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL) { + count++; + w = w->next; + } + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + + return count; +} + static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { external_connectivity_watcher *w = arg; @@ -1447,6 +1526,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, w->pollset); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } @@ -1454,21 +1534,42 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { external_connectivity_watcher *w = arg; - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + external_connectivity_watcher *found = NULL; + if (w->state != NULL) { + external_connectivity_watcher_list_append(w->chand, w); + grpc_closure_run(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + } else { + GPR_ASSERT(w->watcher_timer_init == NULL); + found = lookup_external_connectivity_watcher(w->chand, w->on_complete); + if (found) { + GPR_ASSERT(found->on_complete == w->on_complete); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure); + } + grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, + w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); + gpr_free(w); + } } void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *closure) { + grpc_connectivity_state *state, grpc_closure *closure, + grpc_closure *watcher_timer_init) { channel_data *chand = elem->channel_data; - external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + external_connectivity_watcher *w = gpr_zalloc(sizeof(*w)); w->chand = chand; w->pollset = pollset; w->on_complete = closure; w->state = state; + w->watcher_timer_init = watcher_timer_init; + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 8d2490ea55..356a7ab0c1 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -53,9 +53,13 @@ extern const grpc_channel_filter grpc_client_channel_filter; grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); +int grpc_client_channel_num_external_connectivity_watchers( + grpc_channel_element *elem); + void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete); + grpc_connectivity_state *state, grpc_closure *on_complete, + grpc_closure *watcher_timer_init); /* Debug helper: pull the subchannel call from a call stack element */ grpc_subchannel_call *grpc_client_channel_get_subchannel_call( diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 6e7f410635..7ee6ffb787 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -99,26 +99,13 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; -/** List of subchannels in a connectivity READY state */ -typedef struct ready_list { - grpc_subchannel *subchannel; - /* references namesake entry in subchannel_data */ - void *user_data; - struct ready_list *next; - struct ready_list *prev; -} ready_list; - typedef struct { - /** index within policy->subchannels */ - size_t index; /** backpointer to owning policy */ round_robin_lb_policy *policy; /** subchannel itself */ grpc_subchannel *subchannel; /** notification that connectivity has changed on subchannel */ grpc_closure connectivity_changed_closure; - /** this subchannels current position in subchannel->ready_list */ - ready_list *ready_list_node; /** last observed connectivity. Not updated by * \a grpc_subchannel_notify_on_state_change. Used to determine the previous * state while processing the new state in \a rr_connectivity_changed */ @@ -126,6 +113,10 @@ typedef struct { /** current connectivity state. Updated by \a * grpc_subchannel_notify_on_state_change */ grpc_connectivity_state curr_connectivity_state; + /** connectivity state to be updated by the watcher, not guarded by + * the combiner. Will be moved to curr_connectivity_state inside of + * the combiner by rr_connectivity_changed_locked(). */ + grpc_connectivity_state pending_connectivity_state_unsafe; /** the subchannel's target user data */ void *user_data; /** vtable to operate over \a user_data */ @@ -141,182 +132,106 @@ struct round_robin_lb_policy { /** all our subchannels */ size_t num_subchannels; - subchannel_data **subchannels; + subchannel_data *subchannels; - /** how many subchannels are in TRANSIENT_FAILURE */ + /** how many subchannels are in state READY */ + size_t num_ready; + /** how many subchannels are in state TRANSIENT_FAILURE */ size_t num_transient_failures; - /** how many subchannels are IDLE */ + /** how many subchannels are in state IDLE */ size_t num_idle; /** have we started picking? */ - int started_picking; + bool started_picking; /** are we shutting down? */ - int shutdown; + bool shutdown; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; - /** (Dummy) root of the doubly linked list containing READY subchannels */ - ready_list ready_list; - /** Last pick from the ready list. */ - ready_list *ready_list_last_pick; + // Index into subchannels for last pick. + size_t last_ready_subchannel_index; }; -/** Returns the next subchannel from the connected list or NULL if the list is - * empty. +/** Returns the index into p->subchannels of the next subchannel in + * READY state, or p->num_subchannels if no subchannel is READY. * - * Note that this function does *not* advance p->ready_list_last_pick. Use \a - * advance_last_picked_locked() for that. */ -static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { - ready_list *selected; - selected = p->ready_list_last_pick->next; - - while (selected != NULL) { - if (selected == &p->ready_list) { - GPR_ASSERT(selected->subchannel == NULL); - /* skip dummy root */ - selected = selected->next; - } else { - GPR_ASSERT(selected->subchannel != NULL); - return selected; - } + * Note that this function does *not* update p->last_ready_subchannel_index. + * The caller must do that if it returns a pick. */ +static size_t get_next_ready_subchannel_index_locked( + const round_robin_lb_policy *p) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR: %p] getting next ready subchannel, " + "last_ready_subchannel_index=%lu", + p, (unsigned long)p->last_ready_subchannel_index); } - return NULL; -} - -/** Advance the \a ready_list picking head. */ -static void advance_last_picked_locked(round_robin_lb_policy *p) { - if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - if (p->ready_list_last_pick == &p->ready_list) { - /* skip dummy root */ - p->ready_list_last_pick = p->ready_list_last_pick->next; + for (size_t i = 0; i < p->num_subchannels; ++i) { + const size_t index = + (i + p->last_ready_subchannel_index + 1) % p->num_subchannels; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p, + (unsigned long)index, + p->subchannels[index].curr_connectivity_state); + } + if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu", + p, (unsigned long)index); + } + return index; } - } else { /* should be an empty list */ - GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, " - "CSC %p)", - (void *)p, (void *)p->ready_list_last_pick, - (void *)p->ready_list_last_pick->subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->ready_list_last_pick->subchannel)); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p); } + return p->num_subchannels; } -/** Prepends (relative to the root at p->ready_list) the connected subchannel \a - * csc to the list of ready subchannels. */ -static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - subchannel_data *sd) { - ready_list *new_elem = gpr_zalloc(sizeof(ready_list)); - new_elem->subchannel = sd->subchannel; - new_elem->user_data = sd->user_data; - if (p->ready_list.prev == NULL) { - /* first element */ - new_elem->next = &p->ready_list; - new_elem->prev = &p->ready_list; - p->ready_list.next = new_elem; - p->ready_list.prev = new_elem; - } else { - new_elem->next = &p->ready_list; - new_elem->prev = p->ready_list.prev; - p->ready_list.prev->next = new_elem; - p->ready_list.prev = new_elem; - } +// Sets p->last_ready_subchannel_index to last_ready_index. +static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, + size_t last_ready_index) { + GPR_ASSERT(last_ready_index < p->num_subchannels); + p->last_ready_subchannel_index = last_ready_index; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)", - (void *)new_elem, (void *)sd->subchannel); - } - return new_elem; -} - -/** Removes \a node from the list of connected subchannels */ -static void remove_disconnected_sc_locked(round_robin_lb_policy *p, - ready_list *node) { - if (node == NULL) { - return; - } - if (node == p->ready_list_last_pick) { - p->ready_list_last_pick = p->ready_list_last_pick->prev; - } - - /* removing last item */ - if (node->next == &p->ready_list && node->prev == &p->ready_list) { - GPR_ASSERT(p->ready_list.next == node); - GPR_ASSERT(p->ready_list.prev == node); - p->ready_list.next = NULL; - p->ready_list.prev = NULL; - } else { - node->prev->next = node->next; - node->next->prev = node->prev; - } - - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node, - (void *)node->subchannel); + gpr_log(GPR_DEBUG, + "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", + (void *)p, (unsigned long)last_ready_index, + (void *)p->subchannels[last_ready_index].subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->subchannels[last_ready_index].subchannel)); } - - node->next = NULL; - node->prev = NULL; - node->subchannel = NULL; - - gpr_free(node); -} - -static bool is_ready_list_empty(round_robin_lb_policy *p) { - return p->ready_list.prev == NULL; } static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *elem; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } } - gpr_free(sd); } - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); - - elem = p->ready_list.next; - while (elem != NULL && elem != &p->ready_list) { - ready_list *tmp; - tmp = elem->next; - elem->next = NULL; - elem->prev = NULL; - elem->subchannel = NULL; - gpr_free(elem); - elem = tmp; - } - gpr_free(p); } static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - size_t i; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); } - - p->shutdown = 1; + p->shutdown = true; + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; @@ -328,10 +243,13 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, - &sd->connectivity_changed_closure); + for (size_t i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, + NULL, + &sd->connectivity_changed_closure); + } } } @@ -339,8 +257,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -364,8 +281,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_eq, grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -387,21 +303,16 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { - size_t i; - p->started_picking = 1; - - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - /* use some sentinel value outside of the range of grpc_connectivity_state - * to signal an undefined previous state. We won't be referring to this - * value again and it'll be overwritten after the first call to - * rr_connectivity_changed */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + p->started_picking = true; + for (size_t i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); + } } } @@ -418,36 +329,32 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - ready_list *selected; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } - - if ((selected = peek_next_connected_locked(p))) { + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->num_subchannels) { /* readily available, report right away */ + subchannel_data *sd = &p->subchannels[next_ready_index]; *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "rr_picked"); - + grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked"); if (user_data != NULL) { - *user_data = selected->user_data; + *user_data = sd->user_data; } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", - (void *)*target, (void *)selected); + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)", + (void *)*target, (unsigned long)next_ready_index); } /* only advance the last picked pointer if the selection was used */ - advance_last_picked_locked(p); + update_last_ready_subchannel_index_locked(p, next_ready_index); return 1; } else { /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { start_picking_locked(exec_ctx, p); } - pp = gpr_malloc(sizeof(*pp)); + pending_pick *pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->target = target; pp->on_complete = on_complete; @@ -458,25 +365,31 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } -static void update_state_counters(subchannel_data *sd) { +static void update_state_counters_locked(subchannel_data *sd) { round_robin_lb_policy *p = sd->policy; - - /* update p->num_transient_failures (resp. p->num_idle): if the previous - * state was TRANSIENT_FAILURE (resp. IDLE), decrement - * p->num_transient_failures (resp. p->num_idle). */ - if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { + GPR_ASSERT(p->num_ready > 0); + --p->num_ready; + } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(p->num_transient_failures > 0); --p->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(p->num_idle > 0); --p->num_idle; } + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + ++p->num_ready; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ++p->num_transient_failures; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { + ++p->num_idle; + } } /* sd is the subchannel_data associted with the updated subchannel. * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE * or SHUTDOWN */ -static grpc_connectivity_state update_lb_connectivity_status( +static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). @@ -498,7 +411,7 @@ static grpc_connectivity_state update_lb_connectivity_status( * CHECK: p->num_idle == p->num_subchannels. */ round_robin_lb_policy *p = sd->policy; - if (!is_ready_list_empty(p)) { /* 1) READY */ + if (p->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); return GRPC_CHANNEL_READY; @@ -532,32 +445,62 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; - pending_pick *pp; - - GRPC_ERROR_REF(error); - + // Now that we're inside the combiner, copy the pending connectivity + // state (which was set by the connectivity state watcher) to + // curr_connectivity_state, which is what we use inside of the combiner. + sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] connectivity changed for subchannel %p: " + "prev_state=%d new_state=%d", + p, sd->subchannel, sd->prev_connectivity_state, + sd->curr_connectivity_state); + } + // If we're shutting down, unref and return. if (p->shutdown) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - GRPC_ERROR_UNREF(error); return; } - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_INIT: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_READY: - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd); + // Update state counters and determine new overall state. + update_state_counters_locked(sd); + sd->prev_connectivity_state = sd->curr_connectivity_state; + grpc_connectivity_state new_connectivity_state = + update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); + // If the new state is SHUTDOWN, unref the subchannel, and if the new + // overall state is SHUTDOWN, clean up. + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); + sd->subchannel = NULL; + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } + if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + /* the policy is shutting down. Flush all the pending picks... */ + pending_pick *pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); + } + } + /* unref the "rr_connectivity" weak ref from start_picking */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + } else { + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - ready_list *selected = peek_next_connected_locked(p); - GPR_ASSERT(selected != NULL); + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + GPR_ASSERT(next_ready_index < p->num_subchannels); + subchannel_data *selected = &p->subchannels[next_ready_index]; if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ - advance_last_picked_locked(p); + update_last_ready_subchannel_index_locked(p, next_ready_index); } + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( @@ -568,72 +511,20 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - (void *)selected->subchannel, (void *)selected); + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)", + (void *)selected->subchannel, + (unsigned long)next_ready_index); } grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_IDLE: - ++p->num_idle; - /* fallthrough */ - case GRPC_CHANNEL_CONNECTING: - update_state_counters(sd); - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - ++p->num_transient_failures; - /* remove from ready list if still present */ - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_SHUTDOWN: - update_state_counters(sd); - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - --p->num_subchannels; - GPR_SWAP(subchannel_data *, p->subchannels[sd->index], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); - p->subchannels[sd->index]->index = sd->index; - if (update_lb_connectivity_status(exec_ctx, sd, error) == - GRPC_CHANNEL_SHUTDOWN) { - /* the policy is shutting down. Flush all the pending picks... */ - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } - } - gpr_free(sd); - /* unref the "rr_connectivity" weak ref from start_picking */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - break; + } + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } - GRPC_ERROR_UNREF(error); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -654,10 +545,10 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *selected; - grpc_connected_subchannel *target; - if ((selected = peek_next_connected_locked(p))) { - target = GRPC_CONNECTED_SUBCHANNEL_REF( + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->num_subchannels) { + subchannel_data *selected = &p->subchannels[next_ready_index]; + grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); @@ -708,7 +599,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; + size_t subchannel_index = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; @@ -727,42 +618,44 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s", - (void *)subchannel, address_uri); + gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s", + (unsigned long)subchannel_index, (void *)subchannel, address_uri); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel != NULL) { - subchannel_data *sd = gpr_zalloc(sizeof(*sd)); - p->subchannels[subchannel_idx] = sd; + subchannel_data *sd = &p->subchannels[subchannel_index]; sd->policy = p; - sd->index = subchannel_idx; sd->subchannel = subchannel; + /* use some sentinel value outside of the range of grpc_connectivity_state + * to signal an undefined previous state. We won't be referring to this + * value again and it'll be overwritten after the first call to + * rr_connectivity_changed */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; sd->user_data_vtable = addresses->user_data_vtable; if (sd->user_data_vtable != NULL) { sd->user_data = sd->user_data_vtable->copy(addresses->addresses[i].user_data); } - ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed_locked, sd, grpc_combiner_scheduler(args->combiner, false)); + ++subchannel_index; } } - if (subchannel_idx == 0) { + if (subchannel_index == 0) { /* couldn't create any subchannel. Bail out */ gpr_free(p->subchannels); gpr_free(p); return NULL; } - p->num_subchannels = subchannel_idx; + p->num_subchannels = subchannel_index; - /* The (dummy node) root of the ready list */ - p->ready_list.subchannel = NULL; - p->ready_list.prev = NULL; - p->ready_list.next = NULL; - p->ready_list_last_pick = &p->ready_list; + // Initialize the last pick index to the last subchannel, so that the + // first pick will start at the beginning of the list. + p->last_ready_subchannel_index = subchannel_index - 1; grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index b9c62c376a..2c076e821c 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -127,6 +127,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_mu_lock(&state->mu); if (state->shutdown) { gpr_mu_unlock(&state->mu); + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_NONE); grpc_endpoint_destroy(exec_ctx, tcp); gpr_free(acceptor); return; diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 0ac2c2ad52..7012ffe568 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -105,7 +105,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - grpc_closure_sched(exec_ctx, req->on_done, GRPC_ERROR_REF(error)); + grpc_closure_sched(exec_ctx, req->on_done, error); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); @@ -244,7 +244,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { internal_request *req = arg; if (error != GRPC_ERROR_NONE) { - finish(exec_ctx, req, error); + finish(exec_ctx, req, GRPC_ERROR_REF(error)); return; } req->next_address = 0; diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 76946434f0..ea7c1122c1 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -44,6 +44,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" #include "src/core/tsi/ssl_transport_security.h" +#include "src/core/tsi/transport_security_adapter.h" typedef struct { grpc_channel_security_connector base; @@ -78,7 +79,8 @@ static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx, } grpc_handshake_manager_add( handshake_mgr, - grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base)); + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(handshaker), &sc->base)); } static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 30431a4e4a..416a3bdb35 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -56,6 +56,7 @@ #include "src/core/lib/support/string.h" #include "src/core/tsi/fake_transport_security.h" #include "src/core/tsi/ssl_transport_security.h" +#include "src/core/tsi/transport_security_adapter.h" /* -- Constants. -- */ @@ -390,7 +391,8 @@ static void fake_channel_add_handshakers( grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_fake_handshaker(true /* is_client */), + exec_ctx, tsi_create_adapter_handshaker( + tsi_create_fake_handshaker(true /* is_client */)), &sc->base)); } @@ -400,7 +402,8 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_fake_handshaker(false /* is_client */), + exec_ctx, tsi_create_adapter_handshaker( + tsi_create_fake_handshaker(false /* is_client */)), &sc->base)); } @@ -495,8 +498,10 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx, } // Create handshakers. - grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_hs, &sc->base)); + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, @@ -515,8 +520,10 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, } // Create handshakers. - grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_hs, &sc->base)); + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) { diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 509b4b556d..3bc113e20f 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -71,12 +71,12 @@ typedef struct { unsigned char *handshake_buffer; size_t handshake_buffer_size; - grpc_slice_buffer left_overs; grpc_slice_buffer outgoing; grpc_closure on_handshake_data_sent_to_peer; grpc_closure on_handshake_data_received_from_peer; grpc_closure on_peer_checked; grpc_auth_context *auth_context; + tsi_handshaker_result *handshaker_result; } security_handshaker; static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, @@ -84,6 +84,7 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&h->refs)) { gpr_mu_destroy(&h->mu); tsi_handshaker_destroy(h->handshaker); + tsi_handshaker_result_destroy(h->handshaker_result); if (h->endpoint_to_destroy != NULL) { grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy); } @@ -92,7 +93,6 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, gpr_free(h->read_buffer_to_destroy); } gpr_free(h->handshake_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &h->left_overs); grpc_slice_buffer_destroy_internal(exec_ctx, &h->outgoing); GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake"); GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, h->connector, "handshake"); @@ -150,10 +150,10 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); goto done; } - // Get frame protector. + // Create frame protector. tsi_frame_protector *protector; - tsi_result result = - tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); + tsi_result result = tsi_handshaker_result_create_frame_protector( + h->handshaker_result, NULL, &protector); if (result != TSI_OK) { error = grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), @@ -161,14 +161,25 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, security_handshake_failed_locked(exec_ctx, h, error); goto done; } - // Success. + // Get unused bytes. + unsigned char *unused_bytes = NULL; + size_t unused_bytes_size = 0; + result = tsi_handshaker_result_get_unused_bytes( + h->handshaker_result, &unused_bytes, &unused_bytes_size); // Create secure endpoint. - h->args->endpoint = grpc_secure_endpoint_create( - protector, h->args->endpoint, h->left_overs.slices, h->left_overs.count); - h->left_overs.count = 0; - h->left_overs.length = 0; - // Clear out the read buffer before it gets passed to the transport, - // since any excess bytes were already copied to h->left_overs. + if (unused_bytes_size > 0) { + grpc_slice slice = + grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size); + h->args->endpoint = + grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1); + grpc_slice_unref_internal(exec_ctx, slice); + } else { + h->args->endpoint = + grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0); + } + tsi_handshaker_result_destroy(h->handshaker_result); + h->handshaker_result = NULL; + // Clear out the read buffer before it gets passed to the transport. grpc_slice_buffer_reset_and_unref_internal(exec_ctx, h->args->read_buffer); // Add auth context to channel args. grpc_arg auth_context_arg = grpc_auth_context_to_arg(h->auth_context); @@ -189,7 +200,8 @@ done: static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, security_handshaker *h) { tsi_peer peer; - tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); + tsi_result result = + tsi_handshaker_result_extract_peer(h->handshaker_result, &peer); if (result != TSI_OK) { return grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result); @@ -199,34 +211,87 @@ static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx, - security_handshaker *h) { - // Get data to send. - tsi_result result = TSI_OK; - size_t offset = 0; - do { - size_t to_send_size = h->handshake_buffer_size - offset; - result = tsi_handshaker_get_bytes_to_send_to_peer( - h->handshaker, h->handshake_buffer + offset, &to_send_size); - offset += to_send_size; - if (result == TSI_INCOMPLETE_DATA) { - h->handshake_buffer_size *= 2; - h->handshake_buffer = - gpr_realloc(h->handshake_buffer, h->handshake_buffer_size); - } - } while (result == TSI_INCOMPLETE_DATA); +static grpc_error *on_handshake_next_done_locked( + grpc_exec_ctx *exec_ctx, security_handshaker *h, tsi_result result, + const unsigned char *bytes_to_send, size_t bytes_to_send_size, + tsi_handshaker_result *handshaker_result) { + grpc_error *error = GRPC_ERROR_NONE; + // Read more if we need to. + if (result == TSI_INCOMPLETE_DATA) { + GPR_ASSERT(bytes_to_send_size == 0); + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + &h->on_handshake_data_received_from_peer); + return error; + } if (result != TSI_OK) { return grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result); } - // Send data. - grpc_slice to_send = - grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset); - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); - grpc_slice_buffer_add(&h->outgoing, to_send); - grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, - &h->on_handshake_data_sent_to_peer); - return GRPC_ERROR_NONE; + // Update handshaker result. + if (handshaker_result != NULL) { + GPR_ASSERT(h->handshaker_result == NULL); + h->handshaker_result = handshaker_result; + } + if (bytes_to_send_size > 0) { + // Send data to peer, if needed. + grpc_slice to_send = grpc_slice_from_copied_buffer( + (const char *)bytes_to_send, bytes_to_send_size); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); + grpc_slice_buffer_add(&h->outgoing, to_send); + grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, + &h->on_handshake_data_sent_to_peer); + } else if (handshaker_result == NULL) { + // There is nothing to send, but need to read from peer. + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + &h->on_handshake_data_received_from_peer); + } else { + // Handshake has finished, check peer and so on. + error = check_peer_locked(exec_ctx, h); + } + return error; +} + +static void on_handshake_next_done_grpc_wrapper( + tsi_result result, void *user_data, const unsigned char *bytes_to_send, + size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) { + security_handshaker *h = user_data; + // This callback will be invoked by TSI in a non-grpc thread, so it's + // safe to create our own exec_ctx here. + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_lock(&h->mu); + grpc_error *error = + on_handshake_next_done_locked(&exec_ctx, h, result, bytes_to_send, + bytes_to_send_size, handshaker_result); + if (error != GRPC_ERROR_NONE) { + security_handshake_failed_locked(&exec_ctx, h, error); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(&exec_ctx, h); + } else { + gpr_mu_unlock(&h->mu); + } + grpc_exec_ctx_finish(&exec_ctx); +} + +static grpc_error *do_handshaker_next_locked( + grpc_exec_ctx *exec_ctx, security_handshaker *h, + const unsigned char *bytes_received, size_t bytes_received_size) { + // Invoke TSI handshaker. + unsigned char *bytes_to_send = NULL; + size_t bytes_to_send_size = 0; + tsi_handshaker_result *handshaker_result = NULL; + tsi_result result = tsi_handshaker_next( + h->handshaker, bytes_received, bytes_received_size, &bytes_to_send, + &bytes_to_send_size, &handshaker_result, + &on_handshake_next_done_grpc_wrapper, h); + if (result == TSI_ASYNC) { + // Handshaker operating asynchronously. Nothing else to do here; + // callback will be invoked in a TSI thread. + return GRPC_ERROR_NONE; + } + // Handshaker returned synchronously. Invoke callback directly in + // this thread with our existing exec_ctx. + return on_handshake_next_done_locked(exec_ctx, h, result, bytes_to_send, + bytes_to_send_size, handshaker_result); } static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, @@ -241,72 +306,34 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, security_handshaker_unref(exec_ctx, h); return; } - // Process received data. - tsi_result result = TSI_OK; - size_t consumed_slice_size = 0; + // Copy all slices received. size_t i; + size_t bytes_received_size = 0; for (i = 0; i < h->args->read_buffer->count; i++) { - consumed_slice_size = GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); - result = tsi_handshaker_process_bytes_from_peer( - h->handshaker, GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), - &consumed_slice_size); - if (!tsi_handshaker_is_in_progress(h->handshaker)) break; + bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); } - if (tsi_handshaker_is_in_progress(h->handshaker)) { - /* We may need more data. */ - if (result == TSI_INCOMPLETE_DATA) { - grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, - &h->on_handshake_data_received_from_peer); - goto done; - } else { - error = send_handshake_bytes_to_peer_locked(exec_ctx, h); - if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(exec_ctx, h, error); - gpr_mu_unlock(&h->mu); - security_handshaker_unref(exec_ctx, h); - return; - } - goto done; - } + if (bytes_received_size > h->handshake_buffer_size) { + h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size); + h->handshake_buffer_size = bytes_received_size; } - if (result != TSI_OK) { - security_handshake_failed_locked( - exec_ctx, h, - grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result)); - gpr_mu_unlock(&h->mu); - security_handshaker_unref(exec_ctx, h); - return; - } - /* Handshake is done and successful this point. */ - bool has_left_overs_in_current_slice = - (consumed_slice_size < - GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i])); - size_t num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + - h->args->read_buffer->count - i - 1; - if (num_left_overs > 0) { - /* Put the leftovers in our buffer (ownership transfered). */ - if (has_left_overs_in_current_slice) { - grpc_slice tail = grpc_slice_split_tail(&h->args->read_buffer->slices[i], - consumed_slice_size); - grpc_slice_buffer_add(&h->left_overs, tail); - /* split_tail above increments refcount. */ - grpc_slice_unref_internal(exec_ctx, tail); - } - grpc_slice_buffer_addn( - &h->left_overs, &h->args->read_buffer->slices[i + 1], - num_left_overs - (size_t)has_left_overs_in_current_slice); + size_t offset = 0; + for (i = 0; i < h->args->read_buffer->count; i++) { + size_t slice_size = GPR_SLICE_LENGTH(h->args->read_buffer->slices[i]); + memcpy(h->handshake_buffer + offset, + GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), slice_size); + offset += slice_size; } - // Check peer. - error = check_peer_locked(exec_ctx, h); + // Call TSI handshaker. + error = do_handshaker_next_locked(exec_ctx, h, h->handshake_buffer, + bytes_received_size); + if (error != GRPC_ERROR_NONE) { security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); - return; + } else { + gpr_mu_unlock(&h->mu); } -done: - gpr_mu_unlock(&h->mu); } static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, @@ -321,8 +348,8 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, security_handshaker_unref(exec_ctx, h); return; } - /* We may be done. */ - if (tsi_handshaker_is_in_progress(h->handshaker)) { + // We may be done. + if (h->handshaker_result == NULL) { grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); } else { @@ -371,7 +398,7 @@ static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, h->args = args; h->on_handshake_done = on_handshake_done; gpr_ref(&h->refs); - grpc_error *error = send_handshake_bytes_to_peer_locked(exec_ctx, h); + grpc_error *error = do_handshaker_next_locked(exec_ctx, h, NULL, 0); if (error != GRPC_ERROR_NONE) { security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); @@ -404,7 +431,6 @@ static grpc_handshaker *security_handshaker_create( grpc_schedule_on_exec_ctx); grpc_closure_init(&h->on_peer_checked, on_peer_checked, h, grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&h->left_overs); grpc_slice_buffer_init(&h->outgoing); return &h->base; } |