diff options
Diffstat (limited to 'src')
80 files changed, 2708 insertions, 851 deletions
diff --git a/src/core/ext/census/intrusive_hash_map.c b/src/core/ext/census/intrusive_hash_map.c new file mode 100644 index 0000000000..9f56b765e1 --- /dev/null +++ b/src/core/ext/census/intrusive_hash_map.c @@ -0,0 +1,320 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/census/intrusive_hash_map.h" +#include <string.h> + +extern bool hm_index_compare(const hm_index *A, const hm_index *B); + +/* Simple hashing function that takes lower 32 bits. */ +static __inline uint32_t chunked_vector_hasher(uint64_t key) { + return (uint32_t)key; +} + +/* Vector chunks are 1MiB divided by pointer size. */ +static const size_t VECTOR_CHUNK_SIZE = (1 << 20) / sizeof(void *); + +/* Helper functions which return buckets from the chunked vector. */ +static __inline void **get_mutable_bucket(const chunked_vector *buckets, + uint32_t index) { + if (index < VECTOR_CHUNK_SIZE) { + return &buckets->first_[index]; + } + size_t rest_index = (index - VECTOR_CHUNK_SIZE) / VECTOR_CHUNK_SIZE; + return &buckets->rest_[rest_index][index % VECTOR_CHUNK_SIZE]; +} + +static __inline void *get_bucket(const chunked_vector *buckets, + uint32_t index) { + if (index < VECTOR_CHUNK_SIZE) { + return buckets->first_[index]; + } + size_t rest_index = (index - VECTOR_CHUNK_SIZE) / VECTOR_CHUNK_SIZE; + return buckets->rest_[rest_index][index % VECTOR_CHUNK_SIZE]; +} + +/* Helper function. */ +static __inline size_t RestSize(const chunked_vector *vec) { + return (vec->size_ <= VECTOR_CHUNK_SIZE) + ? 0 + : (vec->size_ - VECTOR_CHUNK_SIZE - 1) / VECTOR_CHUNK_SIZE + 1; +} + +/* Initialize chunked vector to size of 0. */ +static void chunked_vector_init(chunked_vector *vec) { + vec->size_ = 0; + vec->first_ = NULL; + vec->rest_ = NULL; +} + +/* Clear chunked vector and free all memory that has been allocated then + initialize chunked vector. */ +static void chunked_vector_clear(chunked_vector *vec) { + if (vec->first_ != NULL) { + gpr_free(vec->first_); + } + if (vec->rest_ != NULL) { + size_t rest_size = RestSize(vec); + for (size_t i = 0; i < rest_size; ++i) { + if (vec->rest_[i] != NULL) { + gpr_free(vec->rest_[i]); + } + } + gpr_free(vec->rest_); + } + chunked_vector_init(vec); +} + +/* Clear chunked vector and then resize it to n entries. Allow the first 1MB to + be read w/o an extra cache miss. The rest of the elements are stored in an + array of arrays to avoid large mallocs. */ +static void chunked_vector_reset(chunked_vector *vec, size_t n) { + chunked_vector_clear(vec); + vec->size_ = n; + if (n <= VECTOR_CHUNK_SIZE) { + vec->first_ = (void **)gpr_malloc(sizeof(void *) * n); + memset(vec->first_, 0, sizeof(void *) * n); + } else { + vec->first_ = (void **)gpr_malloc(sizeof(void *) * VECTOR_CHUNK_SIZE); + memset(vec->first_, 0, sizeof(void *) * VECTOR_CHUNK_SIZE); + size_t rest_size = RestSize(vec); + vec->rest_ = (void ***)gpr_malloc(sizeof(void **) * rest_size); + memset(vec->rest_, 0, sizeof(void **) * rest_size); + int i = 0; + n -= VECTOR_CHUNK_SIZE; + while (n > 0) { + size_t this_size = GPR_MIN(n, VECTOR_CHUNK_SIZE); + vec->rest_[i] = (void **)gpr_malloc(sizeof(void *) * this_size); + memset(vec->rest_[i], 0, sizeof(void *) * this_size); + n -= this_size; + ++i; + } + } +} + +void intrusive_hash_map_init(intrusive_hash_map *hash_map, + uint32_t initial_log2_table_size) { + hash_map->log2_num_buckets = initial_log2_table_size; + hash_map->num_items = 0; + uint32_t num_buckets = (uint32_t)1 << hash_map->log2_num_buckets; + hash_map->extend_threshold = num_buckets >> 1; + chunked_vector_init(&hash_map->buckets); + chunked_vector_reset(&hash_map->buckets, num_buckets); + hash_map->hash_mask = num_buckets - 1; +} + +bool intrusive_hash_map_empty(const intrusive_hash_map *hash_map) { + return hash_map->num_items == 0; +} + +size_t intrusive_hash_map_size(const intrusive_hash_map *hash_map) { + return hash_map->num_items; +} + +void intrusive_hash_map_end(const intrusive_hash_map *hash_map, hm_index *idx) { + idx->bucket_index = (uint32_t)hash_map->buckets.size_; + GPR_ASSERT(idx->bucket_index <= UINT32_MAX); + idx->item = NULL; +} + +void intrusive_hash_map_next(const intrusive_hash_map *hash_map, + hm_index *idx) { + idx->item = idx->item->hash_link; + while (idx->item == NULL) { + idx->bucket_index++; + if (idx->bucket_index >= hash_map->buckets.size_) { + /* Reached end of table. */ + idx->item = NULL; + return; + } + idx->item = (hm_item *)get_bucket(&hash_map->buckets, idx->bucket_index); + } +} + +void intrusive_hash_map_begin(const intrusive_hash_map *hash_map, + hm_index *idx) { + for (uint32_t i = 0; i < hash_map->buckets.size_; ++i) { + if (get_bucket(&hash_map->buckets, i) != NULL) { + idx->bucket_index = i; + idx->item = (hm_item *)get_bucket(&hash_map->buckets, i); + return; + } + } + intrusive_hash_map_end(hash_map, idx); +} + +hm_item *intrusive_hash_map_find(const intrusive_hash_map *hash_map, + uint64_t key) { + uint32_t index = chunked_vector_hasher(key) & hash_map->hash_mask; + + hm_item *p = (hm_item *)get_bucket(&hash_map->buckets, index); + while (p != NULL) { + if (key == p->key) { + return p; + } + p = p->hash_link; + } + return NULL; +} + +hm_item *intrusive_hash_map_erase(intrusive_hash_map *hash_map, uint64_t key) { + uint32_t index = chunked_vector_hasher(key) & hash_map->hash_mask; + + hm_item **slot = (hm_item **)get_mutable_bucket(&hash_map->buckets, index); + hm_item *p = *slot; + if (p == NULL) { + return NULL; + } + + if (key == p->key) { + *slot = p->hash_link; + p->hash_link = NULL; + hash_map->num_items--; + return p; + } + + hm_item *prev = p; + p = p->hash_link; + + while (p) { + if (key == p->key) { + prev->hash_link = p->hash_link; + p->hash_link = NULL; + hash_map->num_items--; + return p; + } + prev = p; + p = p->hash_link; + } + return NULL; +} + +/* Insert an hm_item* into the underlying chunked vector. hash_mask is + * array_size-1. Returns true if it is a new hm_item and false if the hm_item + * already existed. + */ +static __inline bool intrusive_hash_map_internal_insert(chunked_vector *buckets, + uint32_t hash_mask, + hm_item *item) { + const uint64_t key = item->key; + uint32_t index = chunked_vector_hasher(key) & hash_mask; + hm_item **slot = (hm_item **)get_mutable_bucket(buckets, index); + hm_item *p = *slot; + item->hash_link = p; + + /* Check to see if key already exists. */ + while (p) { + if (p->key == key) { + return false; + } + p = p->hash_link; + } + + /* Otherwise add new entry. */ + *slot = item; + return true; +} + +/* Extend the allocated number of elements in the hash map by a factor of 2. */ +void intrusive_hash_map_extend(intrusive_hash_map *hash_map) { + uint32_t new_log2_num_buckets = 1 + hash_map->log2_num_buckets; + uint32_t new_num_buckets = (uint32_t)1 << new_log2_num_buckets; + GPR_ASSERT(new_num_buckets <= UINT32_MAX && new_num_buckets > 0); + chunked_vector new_buckets; + chunked_vector_init(&new_buckets); + chunked_vector_reset(&new_buckets, new_num_buckets); + uint32_t new_hash_mask = new_num_buckets - 1; + + hm_index cur_idx; + hm_index end_idx; + intrusive_hash_map_end(hash_map, &end_idx); + intrusive_hash_map_begin(hash_map, &cur_idx); + while (!hm_index_compare(&cur_idx, &end_idx)) { + hm_item *new_item = cur_idx.item; + intrusive_hash_map_next(hash_map, &cur_idx); + intrusive_hash_map_internal_insert(&new_buckets, new_hash_mask, new_item); + } + + /* Set values for new chunked_vector. extend_threshold is set to half of + * new_num_buckets. */ + hash_map->log2_num_buckets = new_log2_num_buckets; + chunked_vector_clear(&hash_map->buckets); + hash_map->buckets = new_buckets; + hash_map->hash_mask = new_hash_mask; + hash_map->extend_threshold = new_num_buckets >> 1; +} + +/* Insert a hm_item. The hm_item must remain live until it is removed from the + table. This object does not take the ownership of hm_item. The caller must + remove this hm_item from the table and delete it before this table is + deleted. If hm_item exists already num_items is not changed. */ +bool intrusive_hash_map_insert(intrusive_hash_map *hash_map, hm_item *item) { + if (hash_map->num_items >= hash_map->extend_threshold) { + intrusive_hash_map_extend(hash_map); + } + if (intrusive_hash_map_internal_insert(&hash_map->buckets, + hash_map->hash_mask, item)) { + hash_map->num_items++; + return true; + } + return false; +} + +void intrusive_hash_map_clear(intrusive_hash_map *hash_map, + void (*free_object)(void *)) { + hm_index cur; + hm_index end; + intrusive_hash_map_end(hash_map, &end); + intrusive_hash_map_begin(hash_map, &cur); + + while (!hm_index_compare(&cur, &end)) { + hm_index next = cur; + intrusive_hash_map_next(hash_map, &next); + if (cur.item != NULL) { + hm_item *item = intrusive_hash_map_erase(hash_map, cur.item->key); + (*free_object)((void *)item); + gpr_free(item); + } + cur = next; + } +} + +void intrusive_hash_map_free(intrusive_hash_map *hash_map, + void (*free_object)(void *)) { + intrusive_hash_map_clear(hash_map, (*free_object)); + hash_map->num_items = 0; + hash_map->extend_threshold = 0; + hash_map->log2_num_buckets = 0; + hash_map->hash_mask = 0; + chunked_vector_clear(&hash_map->buckets); +} diff --git a/src/core/ext/census/intrusive_hash_map.h b/src/core/ext/census/intrusive_hash_map.h new file mode 100644 index 0000000000..e316bf4b16 --- /dev/null +++ b/src/core/ext/census/intrusive_hash_map.h @@ -0,0 +1,167 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H +#define GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H + +#include "src/core/ext/census/intrusive_hash_map_internal.h" + +/* intrusive_hash_map is a fast chained hash table. This hash map is faster than + * a dense hash map when the application calls insert and erase more often than + * find. When the workload is dominated by find() a dense hash map may be + * faster. + * + * intrusive_hash_map uses an intrusive header placed within a user defined + * struct. The header field IHM_key MUST be set to a valid value before + * insertion into the hash map or undefined behavior may occur. The header field + * IHM_hash_link MUST to be set to NULL initially. + * + * EXAMPLE USAGE: + * + * typedef struct string_item { + * INTRUSIVE_HASH_MAP_HEADER; + * // User data. + * char *str_buf; + * uint16_t len; + * } string_item; + * + * static string_item *make_string_item(uint64_t key, const char *buf, + * uint16_t len) { + * string_item *item = (string_item *)gpr_malloc(sizeof(string_item)); + * item->IHM_key = key; + * item->IHM_hash_link = NULL; + * item->len = len; + * item->str_buf = (char *)malloc(len); + * memcpy(item->str_buf, buf, len); + * return item; + * } + * + * intrusive_hash_map hash_map; + * intrusive_hash_map_init(&hash_map, 4); + * string_item *new_item1 = make_string_item(10, "test1", 5); + * bool ok = intrusive_hash_map_insert(&hash_map, (hm_item *)new_item1); + * + * string_item *item1 = + * (string_item *)intrusive_hash_map_find(&hash_map, 10); + */ + +/* Hash map item. Stores key and a pointer to the actual object. A user defined + * version of this can be passed in provided the first 2 entries (key and + * hash_link) are the same. These entries must be first in the user defined + * struct. Pointer to struct will need to be cast as (hm_item *) when passed to + * hash map. This allows it to be intrusive. */ +typedef struct hm_item { + uint64_t key; + struct hm_item *hash_link; + /* Optional user defined data after this. */ +} hm_item; + +/* Macro provided for ease of use. This must be first in the user defined + * struct (i.e. uint64_t key and hm_item * must be the first two elements in + * that order). */ +#define INTRUSIVE_HASH_MAP_HEADER \ + uint64_t IHM_key; \ + struct hm_item *IHM_hash_link + +/* Index struct which acts as a pseudo-iterator within the hash map. */ +typedef struct hm_index { + uint32_t bucket_index; // hash map bucket index. + hm_item *item; // Pointer to hm_item within the hash map. +} hm_index; + +/* Returns true if two hm_indices point to the same object within the hash map + * and false otherwise. */ +__inline bool hm_index_compare(const hm_index *A, const hm_index *B) { + return (A->item == B->item && A->bucket_index == B->bucket_index); +} + +/* + * Helper functions for iterating over the hash map. + */ + +/* On return idx will contain an invalid index which is always equal to + * hash_map->buckets.size_ */ +void intrusive_hash_map_end(const intrusive_hash_map *hash_map, hm_index *idx); + +/* Iterates index to the next valid entry in the hash map and stores the + * index within idx. If end of table is reached, idx will contain the same + * values as if intrusive_hash_map_end() was called. */ +void intrusive_hash_map_next(const intrusive_hash_map *hash_map, hm_index *idx); + +/* On return, idx will contain the index of the first non-null entry in the hash + * map. If the hash map is empty, idx will contain the same values as if + * intrusive_hash_map_end() was called. */ +void intrusive_hash_map_begin(const intrusive_hash_map *hash_map, + hm_index *idx); + +/* Initialize intrusive hash map data structure. This must be called before + * the hash map can be used. The initial size of an intrusive hash map will be + * 2^initial_log2_map_size (valid range is [0, 31]). */ +void intrusive_hash_map_init(intrusive_hash_map *hash_map, + uint32_t initial_log2_map_size); + +/* Returns true if the hash map is empty and false otherwise. */ +bool intrusive_hash_map_empty(const intrusive_hash_map *hash_map); + +/* Returns the number of elements currently in the hash map. */ +size_t intrusive_hash_map_size(const intrusive_hash_map *hash_map); + +/* Find a hm_item within the hash map by key. Returns NULL if item was not + * found. */ +hm_item *intrusive_hash_map_find(const intrusive_hash_map *hash_map, + uint64_t key); + +/* Erase the hm_item that corresponds with key. If the hm_item is found, return + * the pointer to the hm_item. Else returns NULL. */ +hm_item *intrusive_hash_map_erase(intrusive_hash_map *hash_map, uint64_t key); + +/* Attempts to insert a new hm_item into the hash map. If an element with the + * same key already exists, it will not insert the new item and return false. + * Otherwise, it will insert the new item and return true. */ +bool intrusive_hash_map_insert(intrusive_hash_map *hash_map, hm_item *item); + +/* Clears entire contents of the hash map, but leaves internal data structure + * untouched. Second argument takes a function pointer to a function that will + * free the object designated by the user and pointed to by hash_map->value. */ +void intrusive_hash_map_clear(intrusive_hash_map *hash_map, + void (*free_object)(void *)); + +/* Erase all contents of hash map and free the memory. Hash map is invalid + * after calling this function and cannot be used until it has been + * reinitialized (intrusive_hash_map_init()). This function takes a function + * pointer to a function that will free the object designated by the user and + * pointed to by hash_map->value. */ +void intrusive_hash_map_free(intrusive_hash_map *hash_map, + void (*free_object)(void *)); + +#endif /* GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_H */ diff --git a/src/core/ext/census/intrusive_hash_map_internal.h b/src/core/ext/census/intrusive_hash_map_internal.h new file mode 100644 index 0000000000..76a9a3a722 --- /dev/null +++ b/src/core/ext/census/intrusive_hash_map_internal.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H +#define GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> +#include <stdbool.h> + +/* The chunked vector is a data structure that allocates buckets for use in the + * hash map. ChunkedVector is logically equivalent to T*[N] (cast void* as + * T*). It's internally implemented as an array of 1MB arrays to avoid + * allocating large consecutive memory chunks. This is an internal data + * structure that should never be accessed directly. */ +typedef struct chunked_vector { + size_t size_; + void **first_; + void ***rest_; +} chunked_vector; + +/* Core intrusive hash map data structure. All internal elements are managed by + * functions and should not be altered manually. */ +typedef struct intrusive_hash_map { + uint32_t num_items; + uint32_t extend_threshold; + uint32_t log2_num_buckets; + uint32_t hash_mask; + chunked_vector buckets; +} intrusive_hash_map; + +#endif /* GRPC_CORE_EXT_CENSUS_INTRUSIVE_HASH_MAP_INTERNAL_H */ diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index f83670db82..04666edbec 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -67,9 +67,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( typedef enum { WAITING, - CALLING_BACK, + READY_TO_CALL_BACK, CALLING_BACK_AND_FINISHED, - CALLED_BACK } callback_phase; typedef struct { @@ -77,11 +76,13 @@ typedef struct { callback_phase phase; grpc_closure on_complete; grpc_closure on_timeout; + grpc_closure watcher_timer_init; grpc_timer alarm; grpc_connectivity_state state; grpc_completion_queue *cq; grpc_cq_completion completion_storage; grpc_channel *channel; + grpc_error *error; void *tag; } state_watcher; @@ -105,11 +106,8 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, gpr_mu_lock(&w->mu); switch (w->phase) { case WAITING: - case CALLED_BACK: + case READY_TO_CALL_BACK: GPR_UNREACHABLE_CODE(return ); - case CALLING_BACK: - w->phase = CALLED_BACK; - break; case CALLING_BACK_AND_FINISHED: delete = 1; break; @@ -123,10 +121,14 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, bool due_to_completion, grpc_error *error) { - int delete = 0; - if (due_to_completion) { grpc_timer_cancel(exec_ctx, &w->alarm); + } else { + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq), NULL, + &w->on_complete, NULL); } gpr_mu_lock(&w->mu); @@ -147,25 +149,27 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, } switch (w->phase) { case WAITING: - w->phase = CALLING_BACK; - grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error), - finished_completion, w, &w->completion_storage); + GRPC_ERROR_REF(error); + w->error = error; + w->phase = READY_TO_CALL_BACK; break; - case CALLING_BACK: + case READY_TO_CALL_BACK: + if (error != GRPC_ERROR_NONE) { + GPR_ASSERT(!due_to_completion); + GRPC_ERROR_UNREF(w->error); + GRPC_ERROR_REF(error); + w->error = error; + } w->phase = CALLING_BACK_AND_FINISHED; + grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w, + &w->completion_storage); break; case CALLING_BACK_AND_FINISHED: GPR_UNREACHABLE_CODE(return ); - case CALLED_BACK: - delete = 1; break; } gpr_mu_unlock(&w->mu); - if (delete) { - delete_state_watcher(exec_ctx, w); - } - GRPC_ERROR_UNREF(error); } @@ -179,6 +183,28 @@ static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error)); } +int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + return grpc_client_channel_num_external_connectivity_watchers( + client_channel_elem); +} + +typedef struct watcher_timer_init_arg { + state_watcher *w; + gpr_timespec deadline; +} watcher_timer_init_arg; + +static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg; + + grpc_timer_init(exec_ctx, &wa->w->alarm, + gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC), + &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_free(wa); +} + void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { @@ -208,16 +234,19 @@ void grpc_channel_watch_connectivity_state( w->cq = cq; w->tag = tag; w->channel = channel; + w->error = NULL; - grpc_timer_init(&exec_ctx, &w->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + watcher_timer_init_arg *wa = gpr_malloc(sizeof(watcher_timer_init_arg)); + wa->w = w; + wa->deadline = deadline; + grpc_closure_init(&w->watcher_timer_init, watcher_timer_init, wa, + grpc_schedule_on_exec_ctx); if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); - grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, - grpc_cq_pollset(cq), &w->state, - &w->on_complete); + grpc_client_channel_watch_connectivity_state( + &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, + &w->on_complete, &w->watcher_timer_init); } else { abort(); } diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index f2f27b9175..8cebbe9eca 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -167,6 +167,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) { return value; } +struct external_connectivity_watcher; + /************************************************************************* * CHANNEL-WIDE FUNCTIONS */ @@ -204,6 +206,11 @@ typedef struct client_channel_channel_data { /** interested parties (owned) */ grpc_pollset_set *interested_parties; + /* external_connectivity_watcher_list head is guarded by its own mutex, since + * counts need to be grabbed immediately without polling on a cq */ + gpr_mu external_connectivity_watcher_list_mu; + struct external_connectivity_watcher *external_connectivity_watcher_list_head; + /* the following properties are guarded by a mutex since API's require them to be instantaneously available */ gpr_mu info_mu; @@ -661,6 +668,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, // Initialize data members. chand->combiner = grpc_combiner_create(NULL); gpr_mu_init(&chand->info_mu); + gpr_mu_init(&chand->external_connectivity_watcher_list_mu); + + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + chand->external_connectivity_watcher_list_head = NULL; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, @@ -749,6 +762,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); + gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); } /************************************************************************* @@ -1431,14 +1445,79 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( return out; } -typedef struct { +typedef struct external_connectivity_watcher { channel_data *chand; grpc_pollset *pollset; grpc_closure *on_complete; + grpc_closure *watcher_timer_init; grpc_connectivity_state *state; grpc_closure my_closure; + struct external_connectivity_watcher *next; } external_connectivity_watcher; +static external_connectivity_watcher *lookup_external_connectivity_watcher( + channel_data *chand, grpc_closure *on_complete) { + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL && w->on_complete != on_complete) { + w = w->next; + } + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return w; +} + +static void external_connectivity_watcher_list_append( + channel_data *chand, external_connectivity_watcher *w) { + GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); + + gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); + GPR_ASSERT(!w->next); + w->next = chand->external_connectivity_watcher_list_head; + chand->external_connectivity_watcher_list_head = w; + gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); +} + +static void external_connectivity_watcher_list_remove( + channel_data *chand, external_connectivity_watcher *too_remove) { + GPR_ASSERT( + lookup_external_connectivity_watcher(chand, too_remove->on_complete)); + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + if (too_remove == chand->external_connectivity_watcher_list_head) { + chand->external_connectivity_watcher_list_head = too_remove->next; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return; + } + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL) { + if (w->next == too_remove) { + w->next = w->next->next; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return; + } + w = w->next; + } + GPR_UNREACHABLE_CODE(return ); +} + +int grpc_client_channel_num_external_connectivity_watchers( + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + int count = 0; + + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL) { + count++; + w = w->next; + } + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + + return count; +} + static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { external_connectivity_watcher *w = arg; @@ -1447,6 +1526,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, w->pollset); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } @@ -1454,21 +1534,42 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { external_connectivity_watcher *w = arg; - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + external_connectivity_watcher *found = NULL; + if (w->state != NULL) { + external_connectivity_watcher_list_append(w->chand, w); + grpc_closure_run(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + } else { + GPR_ASSERT(w->watcher_timer_init == NULL); + found = lookup_external_connectivity_watcher(w->chand, w->on_complete); + if (found) { + GPR_ASSERT(found->on_complete == w->on_complete); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure); + } + grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, + w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); + gpr_free(w); + } } void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *closure) { + grpc_connectivity_state *state, grpc_closure *closure, + grpc_closure *watcher_timer_init) { channel_data *chand = elem->channel_data; - external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + external_connectivity_watcher *w = gpr_zalloc(sizeof(*w)); w->chand = chand; w->pollset = pollset; w->on_complete = closure; w->state = state; + w->watcher_timer_init = watcher_timer_init; + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 8d2490ea55..356a7ab0c1 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -53,9 +53,13 @@ extern const grpc_channel_filter grpc_client_channel_filter; grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); +int grpc_client_channel_num_external_connectivity_watchers( + grpc_channel_element *elem); + void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete); + grpc_connectivity_state *state, grpc_closure *on_complete, + grpc_closure *watcher_timer_init); /* Debug helper: pull the subchannel call from a call stack element */ grpc_subchannel_call *grpc_client_channel_get_subchannel_call( diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index b9c62c376a..2c076e821c 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -127,6 +127,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_mu_lock(&state->mu); if (state->shutdown) { gpr_mu_unlock(&state->mu); + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_NONE); grpc_endpoint_destroy(exec_ctx, tcp); gpr_free(acceptor); return; diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 0ac2c2ad52..7012ffe568 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -105,7 +105,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - grpc_closure_sched(exec_ctx, req->on_done, GRPC_ERROR_REF(error)); + grpc_closure_sched(exec_ctx, req->on_done, error); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); @@ -244,7 +244,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { internal_request *req = arg; if (error != GRPC_ERROR_NONE) { - finish(exec_ctx, req, error); + finish(exec_ctx, req, GRPC_ERROR_REF(error)); return; } req->next_address = 0; diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 76946434f0..ea7c1122c1 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -44,6 +44,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" #include "src/core/tsi/ssl_transport_security.h" +#include "src/core/tsi/transport_security_adapter.h" typedef struct { grpc_channel_security_connector base; @@ -78,7 +79,8 @@ static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx, } grpc_handshake_manager_add( handshake_mgr, - grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base)); + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(handshaker), &sc->base)); } static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 30431a4e4a..416a3bdb35 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -56,6 +56,7 @@ #include "src/core/lib/support/string.h" #include "src/core/tsi/fake_transport_security.h" #include "src/core/tsi/ssl_transport_security.h" +#include "src/core/tsi/transport_security_adapter.h" /* -- Constants. -- */ @@ -390,7 +391,8 @@ static void fake_channel_add_handshakers( grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_fake_handshaker(true /* is_client */), + exec_ctx, tsi_create_adapter_handshaker( + tsi_create_fake_handshaker(true /* is_client */)), &sc->base)); } @@ -400,7 +402,8 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_fake_handshaker(false /* is_client */), + exec_ctx, tsi_create_adapter_handshaker( + tsi_create_fake_handshaker(false /* is_client */)), &sc->base)); } @@ -495,8 +498,10 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx, } // Create handshakers. - grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_hs, &sc->base)); + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, @@ -515,8 +520,10 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, } // Create handshakers. - grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_hs, &sc->base)); + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) { diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 509b4b556d..3bc113e20f 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -71,12 +71,12 @@ typedef struct { unsigned char *handshake_buffer; size_t handshake_buffer_size; - grpc_slice_buffer left_overs; grpc_slice_buffer outgoing; grpc_closure on_handshake_data_sent_to_peer; grpc_closure on_handshake_data_received_from_peer; grpc_closure on_peer_checked; grpc_auth_context *auth_context; + tsi_handshaker_result *handshaker_result; } security_handshaker; static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, @@ -84,6 +84,7 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&h->refs)) { gpr_mu_destroy(&h->mu); tsi_handshaker_destroy(h->handshaker); + tsi_handshaker_result_destroy(h->handshaker_result); if (h->endpoint_to_destroy != NULL) { grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy); } @@ -92,7 +93,6 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, gpr_free(h->read_buffer_to_destroy); } gpr_free(h->handshake_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &h->left_overs); grpc_slice_buffer_destroy_internal(exec_ctx, &h->outgoing); GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake"); GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, h->connector, "handshake"); @@ -150,10 +150,10 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); goto done; } - // Get frame protector. + // Create frame protector. tsi_frame_protector *protector; - tsi_result result = - tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); + tsi_result result = tsi_handshaker_result_create_frame_protector( + h->handshaker_result, NULL, &protector); if (result != TSI_OK) { error = grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), @@ -161,14 +161,25 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, security_handshake_failed_locked(exec_ctx, h, error); goto done; } - // Success. + // Get unused bytes. + unsigned char *unused_bytes = NULL; + size_t unused_bytes_size = 0; + result = tsi_handshaker_result_get_unused_bytes( + h->handshaker_result, &unused_bytes, &unused_bytes_size); // Create secure endpoint. - h->args->endpoint = grpc_secure_endpoint_create( - protector, h->args->endpoint, h->left_overs.slices, h->left_overs.count); - h->left_overs.count = 0; - h->left_overs.length = 0; - // Clear out the read buffer before it gets passed to the transport, - // since any excess bytes were already copied to h->left_overs. + if (unused_bytes_size > 0) { + grpc_slice slice = + grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size); + h->args->endpoint = + grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1); + grpc_slice_unref_internal(exec_ctx, slice); + } else { + h->args->endpoint = + grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0); + } + tsi_handshaker_result_destroy(h->handshaker_result); + h->handshaker_result = NULL; + // Clear out the read buffer before it gets passed to the transport. grpc_slice_buffer_reset_and_unref_internal(exec_ctx, h->args->read_buffer); // Add auth context to channel args. grpc_arg auth_context_arg = grpc_auth_context_to_arg(h->auth_context); @@ -189,7 +200,8 @@ done: static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, security_handshaker *h) { tsi_peer peer; - tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); + tsi_result result = + tsi_handshaker_result_extract_peer(h->handshaker_result, &peer); if (result != TSI_OK) { return grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result); @@ -199,34 +211,87 @@ static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx, - security_handshaker *h) { - // Get data to send. - tsi_result result = TSI_OK; - size_t offset = 0; - do { - size_t to_send_size = h->handshake_buffer_size - offset; - result = tsi_handshaker_get_bytes_to_send_to_peer( - h->handshaker, h->handshake_buffer + offset, &to_send_size); - offset += to_send_size; - if (result == TSI_INCOMPLETE_DATA) { - h->handshake_buffer_size *= 2; - h->handshake_buffer = - gpr_realloc(h->handshake_buffer, h->handshake_buffer_size); - } - } while (result == TSI_INCOMPLETE_DATA); +static grpc_error *on_handshake_next_done_locked( + grpc_exec_ctx *exec_ctx, security_handshaker *h, tsi_result result, + const unsigned char *bytes_to_send, size_t bytes_to_send_size, + tsi_handshaker_result *handshaker_result) { + grpc_error *error = GRPC_ERROR_NONE; + // Read more if we need to. + if (result == TSI_INCOMPLETE_DATA) { + GPR_ASSERT(bytes_to_send_size == 0); + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + &h->on_handshake_data_received_from_peer); + return error; + } if (result != TSI_OK) { return grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result); } - // Send data. - grpc_slice to_send = - grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset); - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); - grpc_slice_buffer_add(&h->outgoing, to_send); - grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, - &h->on_handshake_data_sent_to_peer); - return GRPC_ERROR_NONE; + // Update handshaker result. + if (handshaker_result != NULL) { + GPR_ASSERT(h->handshaker_result == NULL); + h->handshaker_result = handshaker_result; + } + if (bytes_to_send_size > 0) { + // Send data to peer, if needed. + grpc_slice to_send = grpc_slice_from_copied_buffer( + (const char *)bytes_to_send, bytes_to_send_size); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); + grpc_slice_buffer_add(&h->outgoing, to_send); + grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, + &h->on_handshake_data_sent_to_peer); + } else if (handshaker_result == NULL) { + // There is nothing to send, but need to read from peer. + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + &h->on_handshake_data_received_from_peer); + } else { + // Handshake has finished, check peer and so on. + error = check_peer_locked(exec_ctx, h); + } + return error; +} + +static void on_handshake_next_done_grpc_wrapper( + tsi_result result, void *user_data, const unsigned char *bytes_to_send, + size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) { + security_handshaker *h = user_data; + // This callback will be invoked by TSI in a non-grpc thread, so it's + // safe to create our own exec_ctx here. + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_lock(&h->mu); + grpc_error *error = + on_handshake_next_done_locked(&exec_ctx, h, result, bytes_to_send, + bytes_to_send_size, handshaker_result); + if (error != GRPC_ERROR_NONE) { + security_handshake_failed_locked(&exec_ctx, h, error); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(&exec_ctx, h); + } else { + gpr_mu_unlock(&h->mu); + } + grpc_exec_ctx_finish(&exec_ctx); +} + +static grpc_error *do_handshaker_next_locked( + grpc_exec_ctx *exec_ctx, security_handshaker *h, + const unsigned char *bytes_received, size_t bytes_received_size) { + // Invoke TSI handshaker. + unsigned char *bytes_to_send = NULL; + size_t bytes_to_send_size = 0; + tsi_handshaker_result *handshaker_result = NULL; + tsi_result result = tsi_handshaker_next( + h->handshaker, bytes_received, bytes_received_size, &bytes_to_send, + &bytes_to_send_size, &handshaker_result, + &on_handshake_next_done_grpc_wrapper, h); + if (result == TSI_ASYNC) { + // Handshaker operating asynchronously. Nothing else to do here; + // callback will be invoked in a TSI thread. + return GRPC_ERROR_NONE; + } + // Handshaker returned synchronously. Invoke callback directly in + // this thread with our existing exec_ctx. + return on_handshake_next_done_locked(exec_ctx, h, result, bytes_to_send, + bytes_to_send_size, handshaker_result); } static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, @@ -241,72 +306,34 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, security_handshaker_unref(exec_ctx, h); return; } - // Process received data. - tsi_result result = TSI_OK; - size_t consumed_slice_size = 0; + // Copy all slices received. size_t i; + size_t bytes_received_size = 0; for (i = 0; i < h->args->read_buffer->count; i++) { - consumed_slice_size = GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); - result = tsi_handshaker_process_bytes_from_peer( - h->handshaker, GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), - &consumed_slice_size); - if (!tsi_handshaker_is_in_progress(h->handshaker)) break; + bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); } - if (tsi_handshaker_is_in_progress(h->handshaker)) { - /* We may need more data. */ - if (result == TSI_INCOMPLETE_DATA) { - grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, - &h->on_handshake_data_received_from_peer); - goto done; - } else { - error = send_handshake_bytes_to_peer_locked(exec_ctx, h); - if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(exec_ctx, h, error); - gpr_mu_unlock(&h->mu); - security_handshaker_unref(exec_ctx, h); - return; - } - goto done; - } + if (bytes_received_size > h->handshake_buffer_size) { + h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size); + h->handshake_buffer_size = bytes_received_size; } - if (result != TSI_OK) { - security_handshake_failed_locked( - exec_ctx, h, - grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result)); - gpr_mu_unlock(&h->mu); - security_handshaker_unref(exec_ctx, h); - return; - } - /* Handshake is done and successful this point. */ - bool has_left_overs_in_current_slice = - (consumed_slice_size < - GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i])); - size_t num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + - h->args->read_buffer->count - i - 1; - if (num_left_overs > 0) { - /* Put the leftovers in our buffer (ownership transfered). */ - if (has_left_overs_in_current_slice) { - grpc_slice tail = grpc_slice_split_tail(&h->args->read_buffer->slices[i], - consumed_slice_size); - grpc_slice_buffer_add(&h->left_overs, tail); - /* split_tail above increments refcount. */ - grpc_slice_unref_internal(exec_ctx, tail); - } - grpc_slice_buffer_addn( - &h->left_overs, &h->args->read_buffer->slices[i + 1], - num_left_overs - (size_t)has_left_overs_in_current_slice); + size_t offset = 0; + for (i = 0; i < h->args->read_buffer->count; i++) { + size_t slice_size = GPR_SLICE_LENGTH(h->args->read_buffer->slices[i]); + memcpy(h->handshake_buffer + offset, + GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), slice_size); + offset += slice_size; } - // Check peer. - error = check_peer_locked(exec_ctx, h); + // Call TSI handshaker. + error = do_handshaker_next_locked(exec_ctx, h, h->handshake_buffer, + bytes_received_size); + if (error != GRPC_ERROR_NONE) { security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); - return; + } else { + gpr_mu_unlock(&h->mu); } -done: - gpr_mu_unlock(&h->mu); } static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, @@ -321,8 +348,8 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, security_handshaker_unref(exec_ctx, h); return; } - /* We may be done. */ - if (tsi_handshaker_is_in_progress(h->handshaker)) { + // We may be done. + if (h->handshaker_result == NULL) { grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); } else { @@ -371,7 +398,7 @@ static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, h->args = args; h->on_handshake_done = on_handshake_done; gpr_ref(&h->refs); - grpc_error *error = send_handshake_bytes_to_peer_locked(exec_ctx, h); + grpc_error *error = do_handshaker_next_locked(exec_ctx, h, NULL, 0); if (error != GRPC_ERROR_NONE) { security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); @@ -404,7 +431,6 @@ static grpc_handshaker *security_handshaker_create( grpc_schedule_on_exec_ctx); grpc_closure_init(&h->on_peer_checked, on_peer_checked, h, grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&h->left_overs); grpc_slice_buffer_init(&h->outgoing); return &h->base; } diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc index 72a4c4cf94..e4f5cb8422 100644 --- a/src/cpp/common/version_cc.cc +++ b/src/cpp/common/version_cc.cc @@ -37,5 +37,5 @@ #include <grpc++/grpc++.h> namespace grpc { -grpc::string Version() { return "1.4.0-dev"; } +grpc::string Version() { return "1.5.0-dev"; } } diff --git a/src/csharp/.editorconfig b/src/csharp/.editorconfig new file mode 100644 index 0000000000..7bc2bcce18 --- /dev/null +++ b/src/csharp/.editorconfig @@ -0,0 +1,7 @@ +root = true +[**] +end_of_line = LF +indent_style = space +indent_size = 4 +insert_final_newline = true +tab_width = 4 diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj index 6ac25aa1f0..188ddb95b9 100755 --- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj +++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj @@ -16,6 +16,8 @@ <PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl> <PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl> <NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion> + <IncludeSymbols>true</IncludeSymbols> + <IncludeSource>true</IncludeSource> </PropertyGroup> <ItemGroup> @@ -23,7 +25,9 @@ </ItemGroup> <ItemGroup> - <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" /> + <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj"> + <PrivateAssets>None</PrivateAssets> + </ProjectReference> </ItemGroup> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj b/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj index f4dd5105fc..45ec874322 100755 --- a/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj +++ b/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj @@ -1,4 +1,4 @@ -<Project Sdk="Microsoft.NET.Sdk"> +<Project Sdk="Microsoft.NET.Sdk"> <Import Project="..\Grpc.Core\Version.csproj.include" /> <Import Project="..\Grpc.Core\Common.csproj.include" /> @@ -16,6 +16,8 @@ <PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl> <PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl> <NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion> + <IncludeSymbols>true</IncludeSymbols> + <IncludeSource>true</IncludeSource> </PropertyGroup> <ItemGroup> @@ -23,7 +25,9 @@ </ItemGroup> <ItemGroup> - <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" /> + <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj"> + <PrivateAssets>None</PrivateAssets> + </ProjectReference> </ItemGroup> <ItemGroup Condition=" '$(TargetFramework)' == 'net45' "> diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 4f29c35b32..51ae11fbde 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -59,6 +59,8 @@ namespace Grpc.Core readonly ChannelSafeHandle handle; readonly Dictionary<string, ChannelOption> options; + readonly Task connectivityWatcherTask; + bool shutdownRequested; /// <summary> @@ -99,6 +101,9 @@ namespace Grpc.Core this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs); } } + // TODO(jtattermusch): Workaround for https://github.com/GoogleCloudPlatform/google-cloud-dotnet/issues/822. + // Remove once retries are supported in C core + this.connectivityWatcherTask = RunConnectivityWatcherAsync(); GrpcEnvironment.RegisterChannel(this); } @@ -244,7 +249,7 @@ namespace Grpc.Core handle.Dispose(); - await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); + await Task.WhenAll(GrpcEnvironment.ReleaseAsync(), connectivityWatcherTask).ConfigureAwait(false); } internal ChannelSafeHandle Handle @@ -299,6 +304,40 @@ namespace Grpc.Core } } + /// <summary> + /// Constantly Watches channel connectivity status to work around https://github.com/GoogleCloudPlatform/google-cloud-dotnet/issues/822 + /// </summary> + private async Task RunConnectivityWatcherAsync() + { + try + { + var lastState = State; + while (lastState != ChannelState.Shutdown) + { + lock (myLock) + { + if (shutdownRequested) + { + break; + } + } + + try + { + await WaitForStateChangedAsync(lastState, DateTime.UtcNow.AddSeconds(1)).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // ignore timeout + } + lastState = State; + } + } + catch (ObjectDisposedException) { + // during shutdown, channel is going to be disposed. + } + } + private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options) { var key = ChannelOptions.PrimaryUserAgentString; diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index c0865001a8..ae0d8b2c8d 100755 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -1,4 +1,4 @@ -<Project Sdk="Microsoft.NET.Sdk"> +<Project Sdk="Microsoft.NET.Sdk"> <Import Project="Version.csproj.include" /> <Import Project="Common.csproj.include" /> @@ -15,6 +15,8 @@ <PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl> <PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl> <NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion> + <IncludeSymbols>true</IncludeSymbols> + <IncludeSource>true</IncludeSource> </PropertyGroup> <ItemGroup> diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 8ed0c0b92f..bc74e212b1 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -218,5 +218,16 @@ namespace Grpc.Core.Internal { return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; } + + /// <summary> + /// Only for testing. + /// </summary> + public static CallSafeHandle CreateFake(IntPtr ptr, CompletionQueueSafeHandle cq) + { + var call = new CallSafeHandle(); + call.SetHandle(ptr); + call.Initialize(cq); + return call; + } } } diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index a4aa8d3ffe..075286d33e 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -52,6 +52,7 @@ namespace Grpc.Core.Internal readonly GrpcEnvironment environment; readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>(new IntPtrComparer()); + IntPtr lastRegisteredKey; // only for testing public CompletionRegistry(GrpcEnvironment environment) { @@ -62,6 +63,7 @@ namespace Grpc.Core.Internal { environment.DebugStats.PendingBatchCompletions.Increment(); GrpcPreconditions.CheckState(dict.TryAdd(key, callback)); + this.lastRegisteredKey = key; } public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback) @@ -84,6 +86,14 @@ namespace Grpc.Core.Internal return value; } + /// <summary> + /// For testing purposes only. + /// </summary> + public IntPtr LastRegisteredKey + { + get { return this.lastRegisteredKey; } + } + private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback) { try diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 696987d2a8..e703e3e6ce 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -164,6 +164,8 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_test_callback_delegate grpcsharp_test_callback; public readonly Delegates.grpcsharp_test_nop_delegate grpcsharp_test_nop; + public readonly Delegates.grpcsharp_test_override_method_delegate grpcsharp_test_override_method; + #endregion public NativeMethods(UnmanagedLibrary library) @@ -278,6 +280,7 @@ namespace Grpc.Core.Internal this.grpcsharp_test_callback = GetMethodDelegate<Delegates.grpcsharp_test_callback_delegate>(library); this.grpcsharp_test_nop = GetMethodDelegate<Delegates.grpcsharp_test_nop_delegate>(library); + this.grpcsharp_test_override_method = GetMethodDelegate<Delegates.grpcsharp_test_override_method_delegate>(library); } /// <summary> @@ -434,6 +437,7 @@ namespace Grpc.Core.Internal public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback); public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr); + public delegate void grpcsharp_test_override_method_delegate(string methodName, string variant); } } } diff --git a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs index 77ac347c7d..fe757820fd 100644 --- a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs +++ b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs @@ -59,8 +59,14 @@ using System.Runtime.CompilerServices; "0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" + "27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" + "71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")] +[assembly: InternalsVisibleTo("Grpc.Microbenchmarks,PublicKey=" + + "00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" + + "0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" + + "27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" + + "71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")] #else [assembly: InternalsVisibleTo("Grpc.Core.Tests")] [assembly: InternalsVisibleTo("Grpc.Core.Testing")] [assembly: InternalsVisibleTo("Grpc.IntegrationTesting")] +[assembly: InternalsVisibleTo("Grpc.Microbenchmarks")] #endif diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include index 8388bfd9cc..81156452f3 100755 --- a/src/csharp/Grpc.Core/Version.csproj.include +++ b/src/csharp/Grpc.Core/Version.csproj.include @@ -1,7 +1,7 @@ <!-- This file is generated --> <Project> <PropertyGroup> - <GrpcCsharpVersion>1.4.0-dev</GrpcCsharpVersion> + <GrpcCsharpVersion>1.5.0-dev</GrpcCsharpVersion> <GoogleProtobufVersion>3.3.0</GoogleProtobufVersion> </PropertyGroup> </Project> diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index 2e55d9d80e..d507878c2d 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -48,11 +48,11 @@ namespace Grpc.Core /// <summary> /// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies /// </summary> - public const string CurrentAssemblyFileVersion = "1.4.0.0"; + public const string CurrentAssemblyFileVersion = "1.5.0.0"; /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "1.4.0-dev"; + public const string CurrentVersion = "1.5.0-dev"; } } diff --git a/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj b/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj index eac6e1fc95..c3791a4e6b 100755 --- a/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj +++ b/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj @@ -15,6 +15,8 @@ <PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl> <PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl> <NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion> + <IncludeSymbols>true</IncludeSymbols> + <IncludeSource>true</IncludeSource> </PropertyGroup> <ItemGroup> @@ -22,7 +24,9 @@ </ItemGroup> <ItemGroup> - <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" /> + <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj"> + <PrivateAssets>None</PrivateAssets> + </ProjectReference> </ItemGroup> <ItemGroup> diff --git a/src/csharp/Grpc.Microbenchmarks/.gitignore b/src/csharp/Grpc.Microbenchmarks/.gitignore new file mode 100644 index 0000000000..1746e3269e --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/.gitignore @@ -0,0 +1,2 @@ +bin +obj diff --git a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj new file mode 100644 index 0000000000..26a940e488 --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj @@ -0,0 +1,28 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <Import Project="..\Grpc.Core\Version.csproj.include" /> + <Import Project="..\Grpc.Core\Common.csproj.include" /> + + <PropertyGroup> + <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks> + <AssemblyName>Grpc.Microbenchmarks</AssemblyName> + <OutputType>Exe</OutputType> + <PackageId>Grpc.Microbenchmarks</PackageId> + <PackageTargetFallback Condition=" '$(TargetFramework)' == 'netcoreapp1.0' ">$(PackageTargetFallback);portable-net45</PackageTargetFallback> + <RuntimeFrameworkVersion Condition=" '$(TargetFramework)' == 'netcoreapp1.0' ">1.0.4</RuntimeFrameworkVersion> + </PropertyGroup> + + <ItemGroup> + <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" /> + </ItemGroup> + + <ItemGroup Condition=" '$(TargetFramework)' == 'net45' "> + <Reference Include="System" /> + <Reference Include="Microsoft.CSharp" /> + </ItemGroup> + + <ItemGroup> + <Compile Include="..\Grpc.Core\Version.cs" /> + </ItemGroup> + +</Project> diff --git a/src/csharp/Grpc.Microbenchmarks/Program.cs b/src/csharp/Grpc.Microbenchmarks/Program.cs new file mode 100644 index 0000000000..a0ca1f75ae --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/Program.cs @@ -0,0 +1,55 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Logging; + +namespace Grpc.Microbenchmarks +{ + class Program + { + public static void Main(string[] args) + { + GrpcEnvironment.SetLogger(new TextWriterLogger(Console.Error)); + var benchmark = new SendMessageBenchmark(); + benchmark.Init(); + foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12}) + { + benchmark.Run(threadCount, 4 * 1000 * 1000, 0); + } + benchmark.Cleanup(); + } + } +} diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs new file mode 100644 index 0000000000..eea375824f --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs @@ -0,0 +1,106 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; +using Grpc.Core; +using Grpc.Core.Internal; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Grpc.Microbenchmarks +{ + public class SendMessageBenchmark + { + static readonly NativeMethods Native = NativeMethods.Get(); + + GrpcEnvironment environment; + + public void Init() + { + Native.grpcsharp_test_override_method("grpcsharp_call_start_batch", "nop"); + environment = GrpcEnvironment.AddRef(); + } + + public void Cleanup() + { + GrpcEnvironment.ReleaseAsync().Wait(); + // TODO(jtattermusch): track GC stats + } + + public void Run(int threadCount, int iterations, int payloadSize) + { + Console.WriteLine(string.Format("SendMessageBenchmark: threads={0}, iterations={1}, payloadSize={2}", threadCount, iterations, payloadSize)); + var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, payloadSize)); + threadedBenchmark.Run(); + } + + private void ThreadBody(int iterations, int payloadSize) + { + // TODO(jtattermusch): parametrize by number of pending completions. + // TODO(jtattermusch): parametrize by cached/non-cached BatchContextSafeHandle + + var completionRegistry = new CompletionRegistry(environment); + var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry); + var call = CreateFakeCall(cq); + + var sendCompletionHandler = new SendCompletionHandler((success) => { }); + var payload = new byte[payloadSize]; + var writeFlags = default(WriteFlags); + + var stopwatch = Stopwatch.StartNew(); + for (int i = 0; i < iterations; i++) + { + call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false); + var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey); + callback(true); + } + stopwatch.Stop(); + Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds); + + cq.Dispose(); + } + + private static CallSafeHandle CreateFakeCall(CompletionQueueSafeHandle cq) + { + var call = CallSafeHandle.CreateFake(new IntPtr(0xdead), cq); + bool success = false; + while (!success) + { + // avoid calling destroy on a nonexistent grpc_call pointer + call.DangerousAddRef(ref success); + } + return call; + } + } +} diff --git a/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs new file mode 100644 index 0000000000..1c54624034 --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs @@ -0,0 +1,79 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; +using Grpc.Core; +using Grpc.Core.Internal; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Grpc.Microbenchmarks +{ + public class ThreadedBenchmark + { + List<ThreadStart> runners; + + public ThreadedBenchmark(IEnumerable<ThreadStart> runners) + { + this.runners = new List<ThreadStart>(runners); + } + + public ThreadedBenchmark(int threadCount, Action threadBody) + { + this.runners = new List<ThreadStart>(); + for (int i = 0; i < threadCount; i++) + { + this.runners.Add(new ThreadStart(() => threadBody())); + } + } + + public void Run() + { + Console.WriteLine("Running threads."); + var threads = new List<Thread>(); + for (int i = 0; i < runners.Count; i++) + { + var thread = new Thread(runners[i]); + thread.Start(); + threads.Add(thread); + } + + foreach (var thread in threads) + { + thread.Join(); + } + Console.WriteLine("All threads finished."); + } + } +} diff --git a/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj b/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj index 70bfcc89c5..3a07555248 100755 --- a/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj +++ b/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj @@ -15,6 +15,8 @@ <PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl> <PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl> <NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion> + <IncludeSymbols>true</IncludeSymbols> + <IncludeSource>true</IncludeSource> </PropertyGroup> <ItemGroup> @@ -22,7 +24,9 @@ </ItemGroup> <ItemGroup> - <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" /> + <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj"> + <PrivateAssets>None</PrivateAssets> + </ProjectReference> </ItemGroup> <ItemGroup> diff --git a/src/csharp/Grpc.sln b/src/csharp/Grpc.sln index beab3ccb36..d9a7b8d556 100644 --- a/src/csharp/Grpc.sln +++ b/src/csharp/Grpc.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26228.4 +VisualStudioVersion = 15.0.26430.4 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Core", "Grpc.Core\Grpc.Core.csproj", "{BD878CB3-BDB4-46AB-84EF-C3B4729F56BC}" EndProject @@ -37,6 +37,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Reflection", "Grpc.Ref EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Reflection.Tests", "Grpc.Reflection.Tests\Grpc.Reflection.Tests.csproj", "{335AD0A2-F2CC-4C2E-853C-26174206BEE7}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Microbenchmarks", "Grpc.Microbenchmarks\Grpc.Microbenchmarks.csproj", "{84C17746-4727-4290-8E8B-A380793DAE1E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -111,6 +113,10 @@ Global {335AD0A2-F2CC-4C2E-853C-26174206BEE7}.Debug|Any CPU.Build.0 = Debug|Any CPU {335AD0A2-F2CC-4C2E-853C-26174206BEE7}.Release|Any CPU.ActiveCfg = Release|Any CPU {335AD0A2-F2CC-4C2E-853C-26174206BEE7}.Release|Any CPU.Build.0 = Release|Any CPU + {84C17746-4727-4290-8E8B-A380793DAE1E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {84C17746-4727-4290-8E8B-A380793DAE1E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {84C17746-4727-4290-8E8B-A380793DAE1E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {84C17746-4727-4290-8E8B-A380793DAE1E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat index d823942be5..35664cc762 100755 --- a/src/csharp/build_packages_dotnetcli.bat +++ b/src/csharp/build_packages_dotnetcli.bat @@ -28,7 +28,7 @@ @rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. @rem Current package versions -set VERSION=1.4.0-dev +set VERSION=1.5.0-dev @rem Adjust the location of nuget.exe set NUGET=C:\nuget\nuget.exe @@ -51,11 +51,11 @@ powershell -Command "cp -r ..\..\platform=*\artifacts\protoc_* protoc_plugins" @rem To be able to build, we also need to put grpc_csharp_ext to its normal location xcopy /Y /I nativelibs\csharp_ext_windows_x64\grpc_csharp_ext.dll ..\..\cmake\build\x64\Release\ -%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Core --output ..\..\..\artifacts || goto :error -%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Core.Testing --output ..\..\..\artifacts || goto :error -%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Auth --output ..\..\..\artifacts || goto :error -%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.HealthCheck --output ..\..\..\artifacts || goto :error -%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Reflection --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release Grpc.Core --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release Grpc.Core.Testing --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release Grpc.Auth --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release Grpc.HealthCheck --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release Grpc.Reflection --output ..\..\..\artifacts || goto :error %NUGET% pack Grpc.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts || goto :error %NUGET% pack Grpc.Tools.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh index f79c97fbbc..7dc07a220d 100755 --- a/src/csharp/build_packages_dotnetcli.sh +++ b/src/csharp/build_packages_dotnetcli.sh @@ -48,13 +48,13 @@ dotnet restore Grpc.sln mkdir -p ../../libs/opt cp nativelibs/csharp_ext_linux_x64/libgrpc_csharp_ext.so ../../libs/opt -dotnet pack --configuration Release --include-symbols --include-source Grpc.Core --output ../../../artifacts -dotnet pack --configuration Release --include-symbols --include-source Grpc.Core.Testing --output ../../../artifacts -dotnet pack --configuration Release --include-symbols --include-source Grpc.Auth --output ../../../artifacts -dotnet pack --configuration Release --include-symbols --include-source Grpc.HealthCheck --output ../../../artifacts -dotnet pack --configuration Release --include-symbols --include-source Grpc.Reflection --output ../../../artifacts - -nuget pack Grpc.nuspec -Version "1.4.0-dev" -OutputDirectory ../../artifacts -nuget pack Grpc.Tools.nuspec -Version "1.4.0-dev" -OutputDirectory ../../artifacts +dotnet pack --configuration Release Grpc.Core --output ../../../artifacts +dotnet pack --configuration Release Grpc.Core.Testing --output ../../../artifacts +dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts +dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts +dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts + +nuget pack Grpc.nuspec -Version "1.5.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.Tools.nuspec -Version "1.5.0-dev" -OutputDirectory ../../artifacts (cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg) diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index f6cff454bd..a56113eca3 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -529,6 +529,38 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) { grpc_call_unref(call); } +typedef grpc_call_error (*grpcsharp_call_start_batch_func)(grpc_call *call, + const grpc_op *ops, + size_t nops, + void *tag, + void *reserved); + +/* Only for testing */ +static grpc_call_error grpcsharp_call_start_batch_nop(grpc_call *call, + const grpc_op *ops, + size_t nops, void *tag, + void *reserved) { + return GRPC_CALL_OK; +} + +static grpc_call_error grpcsharp_call_start_batch_default(grpc_call *call, + const grpc_op *ops, + size_t nops, + void *tag, + void *reserved) { + return grpc_call_start_batch(call, ops, nops, tag, reserved); +} + +static grpcsharp_call_start_batch_func g_call_start_batch_func = + grpcsharp_call_start_batch_default; + +static grpc_call_error grpcsharp_call_start_batch(grpc_call *call, + const grpc_op *ops, + size_t nops, void *tag, + void *reserved) { + return g_call_start_batch_func(call, ops, nops, tag, reserved); +} + GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary( grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, uint32_t write_flags, @@ -576,8 +608,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary( ops[5].flags = 0; ops[5].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming( @@ -616,8 +648,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming( ops[3].flags = 0; ops[3].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( @@ -656,8 +688,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[3].flags = 0; ops[3].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming( @@ -685,8 +717,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming( ops[1].flags = 0; ops[1].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata( @@ -699,8 +731,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata( ops[0].flags = 0; ops[0].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message( @@ -720,7 +752,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message( ops[1].flags = 0; ops[1].reserved = NULL; - return grpc_call_start_batch(call, ops, nops, ctx, NULL); + return grpcsharp_call_start_batch(call, ops, nops, ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client( @@ -731,8 +763,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client( ops[0].flags = 0; ops[0].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( @@ -773,7 +805,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( ops[nops].reserved = NULL; nops++; } - return grpc_call_start_batch(call, ops, nops, ctx, NULL); + return grpcsharp_call_start_batch(call, ops, nops, ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -784,8 +816,8 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) { ops[0].data.recv_message.recv_message = &(ctx->recv_message); ops[0].flags = 0; ops[0].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -798,8 +830,8 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) { ops[0].flags = 0; ops[0].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_initial_metadata( @@ -817,8 +849,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_initial_metadata( ops[0].flags = 0; ops[0].reserved = NULL; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpcsharp_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), + ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -1092,3 +1124,17 @@ GPR_EXPORT void *GPR_CALLTYPE grpcsharp_test_nop(void *ptr) { return ptr; } GPR_EXPORT int32_t GPR_CALLTYPE grpcsharp_sizeof_grpc_event(void) { return sizeof(grpc_event); } + +/* Override a method for testing */ +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_test_override_method(const char *method_name, const char *variant) { + if (strcmp("grpcsharp_call_start_batch", method_name) == 0) { + if (strcmp("nop", variant) == 0) { + g_call_start_batch_func = grpcsharp_call_start_batch_nop; + } else { + GPR_ASSERT(0); + } + } else { + GPR_ASSERT(0); + } +} diff --git a/src/node/README.md b/src/node/README.md index 4b906643bc..3b98b97879 100644 --- a/src/node/README.md +++ b/src/node/README.md @@ -2,9 +2,9 @@ # Node.js gRPC Library ## PREREQUISITES -- `node`: This requires `node` to be installed, version `0.12` or above. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package. +- `node`: This requires `node` to be installed, version `4.0` or above. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package. -- **Note:** If you installed `node` via a package manager and the version is still less than `0.12`, try directly installing it from [nodejs.org](https://nodejs.org). +- **Note:** If you installed `node` via a package manager and the version is still less than `4.0`, try directly installing it from [nodejs.org](https://nodejs.org). ## INSTALLATION @@ -16,7 +16,7 @@ npm install grpc ## BUILD FROM SOURCE 1. Clone [the grpc Git Repository](https://github.com/grpc/grpc). - 2. Run `npm install` from the repository root. + 2. Run `npm install --build-from-source` from the repository root. - **Note:** On Windows, this might fail due to [nodejs issue #4932](https://github.com/nodejs/node/issues/4932) in which case, you will see something like the following in `npm install`'s output (towards the very beginning): @@ -34,61 +34,3 @@ npm install grpc ## TESTING To run the test suite, simply run `npm test` in the install location. - -## API -This library internally uses [ProtoBuf.js](https://github.com/dcodeIO/ProtoBuf.js), and some structures it exports match those exported by that library. - -If you require this module, you will get an object with the following members - -```javascript -function load(filename) -``` - -Takes a filename of a [Protocol Buffer](https://developers.google.com/protocol-buffers/) file, and returns an object representing the structure of the protocol buffer in the following way: - - - Namespaces become maps from the names of their direct members to those member objects - - Service definitions become client constructors for clients for that service. They also have a `service` member that can be used for constructing servers. - - Message definitions become Message constructors like those that ProtoBuf.js would create - - Enum definitions become Enum objects like those that ProtoBuf.js would create - - Anything else becomes the relevant reflection object that ProtoBuf.js would create - - -```javascript -function loadObject(reflectionObject) -``` - -Returns the same structure that `load` returns, but takes a reflection object from `ProtoBuf.js` instead of a file name. - -```javascript -function Server([serverOptions]) -``` - -Constructs a server to which service/implementation pairs can be added. - - -```javascript -status -``` - -An object mapping status names to status code numbers. - - -```javascript -callError -``` - -An object mapping call error names to codes. This is primarily useful for tracking down certain kinds of internal errors. - - -```javascript -Credentials -``` - -An object with factory methods for creating credential objects for clients. - - -```javascript -ServerCredentials -``` - -An object with factory methods for creating credential objects for servers. diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 49179ab359..9453000ad3 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -508,9 +508,14 @@ void Call::DestroyCall() { } Call::Call(grpc_call *call) - : wrapped_call(call), pending_batches(0), has_final_op_completed(false) {} + : wrapped_call(call), pending_batches(0), has_final_op_completed(false) { + peer = grpc_call_get_peer(call); +} -Call::~Call() { DestroyCall(); } +Call::~Call() { + DestroyCall(); + gpr_free(peer); +} void Call::Init(Local<Object> exports) { HandleScope scope; @@ -662,6 +667,16 @@ NAN_METHOD(Call::StartBatch) { } Local<Function> callback_func = info[1].As<Function>(); Call *call = ObjectWrap::Unwrap<Call>(info.This()); + if (call->wrapped_call == NULL) { + /* This implies that the call has completed and has been destroyed. To + * emulate + * previous behavior, we should call the callback immediately with an error, + * as though the batch had failed in core */ + Local<Value> argv[] = { + Nan::Error("The async function failed because the call has completed")}; + Nan::Call(callback_func, Nan::New<Object>(), 1, argv); + return; + } Local<Object> obj = Nan::To<Object>(info[0]).ToLocalChecked(); Local<Array> keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked(); size_t nops = keys->Length(); @@ -727,6 +742,11 @@ NAN_METHOD(Call::Cancel) { return Nan::ThrowTypeError("cancel can only be called on Call objects"); } Call *call = ObjectWrap::Unwrap<Call>(info.This()); + if (call->wrapped_call == NULL) { + /* Cancel is supposed to be idempotent. If the call has already finished, + * cancel should just complete silently */ + return; + } grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("cancel failed", error)); @@ -747,6 +767,11 @@ NAN_METHOD(Call::CancelWithStatus) { "cancelWithStatus's second argument must be a string"); } Call *call = ObjectWrap::Unwrap<Call>(info.This()); + if (call->wrapped_call == NULL) { + /* Cancel is supposed to be idempotent. If the call has already finished, + * cancel should just complete silently */ + return; + } grpc_status_code code = static_cast<grpc_status_code>(Nan::To<uint32_t>(info[0]).FromJust()); if (code == GRPC_STATUS_OK) { @@ -763,9 +788,7 @@ NAN_METHOD(Call::GetPeer) { return Nan::ThrowTypeError("getPeer can only be called on Call objects"); } Call *call = ObjectWrap::Unwrap<Call>(info.This()); - char *peer = grpc_call_get_peer(call->wrapped_call); - Local<Value> peer_value = Nan::New(peer).ToLocalChecked(); - gpr_free(peer); + Local<Value> peer_value = Nan::New(call->peer).ToLocalChecked(); info.GetReturnValue().Set(peer_value); } @@ -780,6 +803,10 @@ NAN_METHOD(Call::SetCredentials) { "setCredentials' first argument must be a CallCredentials"); } Call *call = ObjectWrap::Unwrap<Call>(info.This()); + if (call->wrapped_call == NULL) { + return Nan::ThrowError( + "Cannot set credentials on a call that has already started"); + } CallCredentials *creds_object = ObjectWrap::Unwrap<CallCredentials>( Nan::To<Object>(info[0]).ToLocalChecked()); grpc_call_credentials *creds = creds_object->GetWrappedCredentials(); diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 0bd24f56a9..8f751279e4 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -96,6 +96,7 @@ class Call : public Nan::ObjectWrap { call, this is GRPC_OP_RECV_STATUS_ON_CLIENT and for a server call, this is GRPC_OP_SEND_STATUS_FROM_SERVER */ bool has_final_op_completed; + char *peer; }; class Op { diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json index 37c9b7a54f..0922f54a39 100644 --- a/src/node/health_check/package.json +++ b/src/node/health_check/package.json @@ -1,6 +1,6 @@ { "name": "grpc-health-check", - "version": "1.4.0-dev", + "version": "1.5.0-dev", "author": "Google Inc.", "description": "Health check service for use with gRPC", "repository": { @@ -15,7 +15,7 @@ } ], "dependencies": { - "grpc": "^1.4.0-dev", + "grpc": "^1.5.0-dev", "lodash": "^3.9.3", "google-protobuf": "^3.0.0" }, diff --git a/src/node/index.js b/src/node/index.js index 2da77c3eae..177628e22d 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2015, Google Inc. * All rights reserved. * @@ -31,10 +31,6 @@ * */ -/** - * @module - */ - 'use strict'; var path = require('path'); @@ -64,24 +60,30 @@ var constants = require('./src/constants.js'); grpc.setDefaultRootsPem(fs.readFileSync(SSL_ROOTS_PATH, 'ascii')); /** - * Load a ProtoBuf.js object as a gRPC object. The options object can provide - * the following options: - * - binaryAsBase64: deserialize bytes values as base64 strings instead of - * Buffers. Defaults to false - * - longsAsStrings: deserialize long values as strings instead of objects. - * Defaults to true - * - deprecatedArgumentOrder: Use the beta method argument order for client - * methods, with optional arguments after the callback. Defaults to false. - * This option is only a temporary stopgap measure to smooth an API breakage. - * It is deprecated, and new code should not use it. - * - protobufjsVersion: Available values are 5, 6, and 'detect'. 5 and 6 - * respectively indicate that an object from the corresponding version of - * ProtoBuf.js is provided in the value argument. If the option is 'detect', - * gRPC will guess what the version is based on the structure of the value. - * Defaults to 'detect'. + * @namespace grpc + */ + +/** + * Load a ProtoBuf.js object as a gRPC object. + * @memberof grpc + * @alias grpc.loadObject * @param {Object} value The ProtoBuf.js reflection object to load * @param {Object=} options Options to apply to the loaded file - * @return {Object<string, *>} The resulting gRPC object + * @param {bool=} [options.binaryAsBase64=false] deserialize bytes values as + * base64 strings instead of Buffers + * @param {bool=} [options.longsAsStrings=true] deserialize long values as + * strings instead of objects + * @param {bool=} [options.enumsAsStrings=true] deserialize enum values as + * strings instead of numbers. Only works with Protobuf.js 6 values. + * @param {bool=} [options.deprecatedArgumentOrder=false] use the beta method + * argument order for client methods, with optional arguments after the + * callback. This option is only a temporary stopgap measure to smooth an + * API breakage. It is deprecated, and new code should not use it. + * @param {(number|string)=} [options.protobufjsVersion='detect'] 5 and 6 + * respectively indicate that an object from the corresponding version of + * Protobuf.js is provided in the value argument. If the option is 'detect', + * gRPC wll guess what the version is based on the structure of the value. + * @return {Object<string, *>} The resulting gRPC object. */ exports.loadObject = function loadObject(value, options) { options = _.defaults(options, common.defaultGrpcOptions); @@ -112,22 +114,23 @@ exports.loadObject = function loadObject(value, options) { var loadObject = exports.loadObject; /** - * Load a gRPC object from a .proto file. The options object can provide the - * following options: - * - convertFieldsToCamelCase: Load this file with field names in camel case - * instead of their original case - * - binaryAsBase64: deserialize bytes values as base64 strings instead of - * Buffers. Defaults to false - * - longsAsStrings: deserialize long values as strings instead of objects. - * Defaults to true - * - deprecatedArgumentOrder: Use the beta method argument order for client - * methods, with optional arguments after the callback. Defaults to false. - * This option is only a temporary stopgap measure to smooth an API breakage. - * It is deprecated, and new code should not use it. + * Load a gRPC object from a .proto file. + * @memberof grpc + * @alias grpc.load * @param {string|{root: string, file: string}} filename The file to load * @param {string=} format The file format to expect. Must be either 'proto' or * 'json'. Defaults to 'proto' * @param {Object=} options Options to apply to the loaded file + * @param {bool=} [options.convertFieldsToCamelCase=false] Load this file with + * field names in camel case instead of their original case + * @param {bool=} [options.binaryAsBase64=false] deserialize bytes values as + * base64 strings instead of Buffers + * @param {bool=} [options.longsAsStrings=true] deserialize long values as + * strings instead of objects + * @param {bool=} [options.deprecatedArgumentOrder=false] use the beta method + * argument order for client methods, with optional arguments after the + * callback. This option is only a temporary stopgap measure to smooth an + * API breakage. It is deprecated, and new code should not use it. * @return {Object<string, *>} The resulting gRPC object */ exports.load = function load(filename, format, options) { @@ -168,6 +171,8 @@ var log_template = _.template( * called. Note: the output format here is intended to be informational, and * is not guaranteed to stay the same in the future. * Logs will be directed to logger.error. + * @memberof grpc + * @alias grpc.setLogger * @param {Console} logger A Console-like object. */ exports.setLogger = function setLogger(logger) { @@ -187,6 +192,8 @@ exports.setLogger = function setLogger(logger) { /** * Sets the logger verbosity for gRPC module logging. The options are members * of the grpc.logVerbosity map. + * @memberof grpc + * @alias grpc.setLogVerbosity * @param {Number} verbosity The minimum severity to log */ exports.setLogVerbosity = function setLogVerbosity(verbosity) { @@ -194,71 +201,70 @@ exports.setLogVerbosity = function setLogVerbosity(verbosity) { grpc.setLogVerbosity(verbosity); }; -/** - * @see module:src/server.Server - */ exports.Server = server.Server; -/** - * @see module:src/metadata - */ exports.Metadata = Metadata; -/** - * Status name to code number mapping - */ exports.status = constants.status; -/** - * Propagate flag name to number mapping - */ exports.propagate = constants.propagate; -/** - * Call error name to code number mapping - */ exports.callError = constants.callError; -/** - * Write flag name to code number mapping - */ exports.writeFlags = constants.writeFlags; -/** - * Log verbosity setting name to code number mapping - */ exports.logVerbosity = constants.logVerbosity; -/** - * Credentials factories - */ exports.credentials = require('./src/credentials.js'); /** * ServerCredentials factories + * @constructor ServerCredentials + * @memberof grpc */ exports.ServerCredentials = grpc.ServerCredentials; /** - * @see module:src/client.makeClientConstructor + * Create insecure server credentials + * @name grpc.ServerCredentials.createInsecure + * @kind function + * @return grpc.ServerCredentials */ -exports.makeGenericClientConstructor = client.makeClientConstructor; /** - * @see module:src/client.getClientChannel + * A private key and certificate pair + * @typedef {Object} grpc.ServerCredentials~keyCertPair + * @property {Buffer} privateKey The server's private key + * @property {Buffer} certChain The server's certificate chain */ -exports.getClientChannel = client.getClientChannel; /** - * @see module:src/client.waitForClientReady + * Create SSL server credentials + * @name grpc.ServerCredentials.createInsecure + * @kind function + * @param {?Buffer} rootCerts Root CA certificates for validating client + * certificates + * @param {Array<grpc.ServerCredentials~keyCertPair>} keyCertPairs A list of + * private key and certificate chain pairs to be used for authenticating + * the server + * @param {boolean} [checkClientCertificate=false] Indicates that the server + * should request and verify the client's certificates + * @return grpc.ServerCredentials */ + +exports.makeGenericClientConstructor = client.makeClientConstructor; + +exports.getClientChannel = client.getClientChannel; + exports.waitForClientReady = client.waitForClientReady; +/** + * @memberof grpc + * @alias grpc.closeClient + * @param {grpc.Client} client_obj The client to close + */ exports.closeClient = function closeClient(client_obj) { client.Client.prototype.close.apply(client_obj); }; -/** - * @see module:src/client.Client - */ exports.Client = client.Client; diff --git a/src/node/src/client.js b/src/node/src/client.js index 16fe06a54d..f59ac5c94c 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2015, Google Inc. * All rights reserved. * @@ -43,8 +43,6 @@ * var Client = proto_obj.package.subpackage.ServiceName; * var client = new Client(server_address, client_credentials); * var call = client.unaryMethod(arguments, callback); - * - * @module */ 'use strict'; @@ -70,13 +68,26 @@ var Duplex = stream.Duplex; var util = require('util'); var version = require('../../../package.json').version; +/** + * Initial response metadata sent by the server when it starts processing the + * call + * @event grpc~ClientUnaryCall#metadata + * @type {grpc.Metadata} + */ + +/** + * Status of the call when it has completed. + * @event grpc~ClientUnaryCall#status + * @type grpc~StatusObject + */ + util.inherits(ClientUnaryCall, EventEmitter); /** - * An EventEmitter. Used for unary calls - * @constructor + * An EventEmitter. Used for unary calls. + * @constructor grpc~ClientUnaryCall * @extends external:EventEmitter - * @param {grpc.Call} call The call object associated with the request + * @param {grpc.internal~Call} call The call object associated with the request */ function ClientUnaryCall(call) { EventEmitter.call(this); @@ -88,14 +99,16 @@ util.inherits(ClientWritableStream, Writable); /** * A stream that the client can write to. Used for calls that are streaming from * the client side. - * @constructor + * @constructor grpc~ClientWritableStream * @extends external:Writable - * @borrows module:src/client~ClientUnaryCall#cancel as - * module:src/client~ClientWritableStream#cancel - * @borrows module:src/client~ClientUnaryCall#getPeer as - * module:src/client~ClientWritableStream#getPeer - * @param {grpc.Call} call The call object to send data with - * @param {module:src/common~serialize=} [serialize=identity] Serialization + * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientWritableStream#cancel + * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientWritableStream#getPeer + * @borrows grpc~ClientUnaryCall#event:metadata as + * grpc~ClientWritableStream#metadata + * @borrows grpc~ClientUnaryCall#event:status as + * grpc~ClientWritableStream#status + * @param {grpc.internal~Call} call The call object to send data with + * @param {grpc~serialize=} [serialize=identity] Serialization * function for writes. */ function ClientWritableStream(call, serialize) { @@ -110,17 +123,36 @@ function ClientWritableStream(call, serialize) { } /** + * Write a message to the request stream. If serializing the argument fails, + * the call will be cancelled and the stream will end with an error. + * @name grpc~ClientWritableStream#write + * @kind function + * @override + * @param {*} message The message to write. Must be a valid argument to the + * serialize function of the corresponding method + * @param {grpc.writeFlags} flags Flags to modify how the message is written + * @param {Function} callback Callback for when this chunk of data is flushed + * @return {boolean} As defined for [Writable]{@link external:Writable} + */ + +/** * Attempt to write the given chunk. Calls the callback when done. This is an * implementation of a method needed for implementing stream.Writable. - * @access private - * @param {Buffer} chunk The chunk to write - * @param {string} encoding Used to pass write flags + * @private + * @param {*} chunk The chunk to write + * @param {grpc.writeFlags} encoding Used to pass write flags * @param {function(Error=)} callback Called when the write is complete */ function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; var message; + var self = this; + if (this.writeFailed) { + /* Once a write fails, just call the callback immediately to let the caller + flush any pending writes. */ + setImmediate(callback); + } try { message = this.serialize(chunk); } catch (e) { @@ -141,8 +173,10 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, event) { if (err) { - // Something has gone wrong. Stop writing by failing to call callback - return; + /* Assume that the call is complete and that writing failed because a + status was received. In that case, set a flag to discard all future + writes */ + self.writeFailed = true; } callback(); }); @@ -155,14 +189,16 @@ util.inherits(ClientReadableStream, Readable); /** * A stream that the client can read from. Used for calls that are streaming * from the server side. - * @constructor + * @constructor grpc~ClientReadableStream * @extends external:Readable - * @borrows module:src/client~ClientUnaryCall#cancel as - * module:src/client~ClientReadableStream#cancel - * @borrows module:src/client~ClientUnaryCall#getPeer as - * module:src/client~ClientReadableStream#getPeer - * @param {grpc.Call} call The call object to read data with - * @param {module:src/common~deserialize=} [deserialize=identity] + * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientReadableStream#cancel + * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientReadableStream#getPeer + * @borrows grpc~ClientUnaryCall#event:metadata as + * grpc~ClientReadableStream#metadata + * @borrows grpc~ClientUnaryCall#event:status as + * grpc~ClientReadableStream#status + * @param {grpc.internal~Call} call The call object to read data with + * @param {grpc~deserialize=} [deserialize=identity] * Deserialization function for reads */ function ClientReadableStream(call, deserialize) { @@ -183,7 +219,7 @@ function ClientReadableStream(call, deserialize) { * parameter indicates that the call should end with that status. status * defaults to OK if not provided. * @param {Object!} status The status that the call should end with - * @access private + * @private */ function _readsDone(status) { /* jshint validthis: true */ @@ -202,7 +238,7 @@ ClientReadableStream.prototype._readsDone = _readsDone; /** * Called to indicate that we have received a status from the server. - * @access private + * @private */ function _receiveStatus(status) { /* jshint validthis: true */ @@ -215,7 +251,7 @@ ClientReadableStream.prototype._receiveStatus = _receiveStatus; /** * If we have both processed all incoming messages and received the status from * the server, emit the status. Otherwise, do nothing. - * @access private + * @private */ function _emitStatusIfDone() { /* jshint validthis: true */ @@ -242,7 +278,7 @@ ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone; /** * Read the next object from the stream. - * @access private + * @private * @param {*} size Ignored because we use objectMode=true */ function _read(size) { @@ -300,16 +336,19 @@ util.inherits(ClientDuplexStream, Duplex); /** * A stream that the client can read from or write to. Used for calls with * duplex streaming. - * @constructor + * @constructor grpc~ClientDuplexStream * @extends external:Duplex - * @borrows module:src/client~ClientUnaryCall#cancel as - * module:src/client~ClientDuplexStream#cancel - * @borrows module:src/client~ClientUnaryCall#getPeer as - * module:src/client~ClientDuplexStream#getPeer - * @param {grpc.Call} call Call object to proxy - * @param {module:src/common~serialize=} [serialize=identity] Serialization + * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientDuplexStream#cancel + * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientDuplexStream#getPeer + * @borrows grpc~ClientWritableStream#write as grpc~ClientDuplexStream#write + * @borrows grpc~ClientUnaryCall#event:metadata as + * grpc~ClientDuplexStream#metadata + * @borrows grpc~ClientUnaryCall#event:status as + * grpc~ClientDuplexStream#status + * @param {grpc.internal~Call} call Call object to proxy + * @param {grpc~serialize=} [serialize=identity] Serialization * function for requests - * @param {module:src/common~deserialize=} [deserialize=identity] + * @param {grpc~deserialize=} [deserialize=identity] * Deserialization function for responses */ function ClientDuplexStream(call, serialize, deserialize) { @@ -336,8 +375,9 @@ ClientDuplexStream.prototype._read = _read; ClientDuplexStream.prototype._write = _write; /** - * Cancel the ongoing call - * @alias module:src/client~ClientUnaryCall#cancel + * Cancel the ongoing call. Results in the call ending with a CANCELLED status, + * unless it has already ended with some other status. + * @alias grpc~ClientUnaryCall#cancel */ function cancel() { /* jshint validthis: true */ @@ -352,7 +392,7 @@ ClientDuplexStream.prototype.cancel = cancel; /** * Get the endpoint this call/stream is connected to. * @return {string} The URI of the endpoint - * @alias module:src/client~ClientUnaryCall#getPeer + * @alias grpc~ClientUnaryCall#getPeer */ function getPeer() { /* jshint validthis: true */ @@ -368,33 +408,31 @@ ClientDuplexStream.prototype.getPeer = getPeer; * Any client call type * @typedef {(ClientUnaryCall|ClientReadableStream| * ClientWritableStream|ClientDuplexStream)} - * module:src/client~Call + * grpc.Client~Call */ /** * Options that can be set on a call. - * @typedef {Object} module:src/client~CallOptions - * @property {(date|number)} deadline The deadline for the entire call to - * complete. A value of Infinity indicates that no deadline should be set. - * @property {(string)} host Server hostname to set on the call. Only meaningful + * @typedef {Object} grpc.Client~CallOptions + * @property {grpc~Deadline} deadline The deadline for the entire call to + * complete. + * @property {string} host Server hostname to set on the call. Only meaningful * if different from the server address used to construct the client. - * @property {module:src/client~Call} parent Parent call. Used in servers when + * @property {grpc.Client~Call} parent Parent call. Used in servers when * making a call as part of the process of handling a call. Used to * propagate some information automatically, as specified by * propagate_flags. * @property {number} propagate_flags Indicates which properties of a parent * call should propagate to this call. Bitwise combination of flags in - * [grpc.propagate]{@link module:index.propagate}. - * @property {module:src/credentials~CallCredentials} credentials The - * credentials that should be used to make this particular call. + * {@link grpc.propagate}. + * @property {grpc.credentials~CallCredentials} credentials The credentials that + * should be used to make this particular call. */ /** - * Get a call object built with the provided options. Keys for options are - * 'deadline', which takes a date or number, and 'host', which takes a string - * and overrides the hostname to connect to. + * Get a call object built with the provided options. * @access private - * @param {module:src/client~CallOptions=} options Options object. + * @param {grpc.Client~CallOptions=} options Options object. */ function getCall(channel, method, options) { var deadline; @@ -422,14 +460,14 @@ function getCall(channel, method, options) { /** * A generic gRPC client. Primarily useful as a base class for generated clients - * @alias module:src/client.Client + * @memberof grpc * @constructor * @param {string} address Server address to connect to - * @param {module:src/credentials~ChannelCredentials} credentials Credentials to - * use to connect to the server + * @param {grpc~ChannelCredentials} credentials Credentials to use to connect to + * the server * @param {Object} options Options to apply to channel creation */ -var Client = exports.Client = function Client(address, credentials, options) { +function Client(address, credentials, options) { if (!options) { options = {}; } @@ -445,19 +483,13 @@ var Client = exports.Client = function Client(address, credentials, options) { /* Private fields use $ as a prefix instead of _ because it is an invalid * prefix of a method name */ this.$channel = new grpc.Channel(address, credentials, options); -}; +} -/** - * @typedef {Error} module:src/client.Client~ServiceError - * @property {number} code The error code, a key of - * [grpc.status]{@link module:src/client.status} - * @property {module:metadata.Metadata} metadata Metadata sent with the status - * by the server, if any - */ +exports.Client = Client; /** - * @callback module:src/client.Client~requestCallback - * @param {?module:src/client.Client~ServiceError} error The error, if the call + * @callback grpc.Client~requestCallback + * @param {?grpc~ServiceError} error The error, if the call * failed * @param {*} value The response value, if the call succeeded */ @@ -466,17 +498,17 @@ var Client = exports.Client = function Client(address, credentials, options) { * Make a unary request to the given method, using the given serialize * and deserialize functions, with the given argument. * @param {string} method The name of the method to request - * @param {module:src/common~serialize} serialize The serialization function for + * @param {grpc~serialize} serialize The serialization function for * inputs - * @param {module:src/common~deserialize} deserialize The deserialization + * @param {grpc~deserialize} deserialize The deserialization * function for outputs * @param {*} argument The argument to the call. Should be serializable with * serialize - * @param {module:src/metadata.Metadata=} metadata Metadata to add to the call - * @param {module:src/client~CallOptions=} options Options map - * @param {module:src/client.Client~requestCallback} callback The callback to + * @param {grpc.Metadata=} metadata Metadata to add to the call + * @param {grpc.Client~CallOptions=} options Options map + * @param {grpc.Client~requestCallback} callback The callback to * for when the response is received - * @return {EventEmitter} An event emitter for stream related events + * @return {grpc~ClientUnaryCall} An event emitter for stream related events */ Client.prototype.makeUnaryRequest = function(method, serialize, deserialize, argument, metadata, options, @@ -548,17 +580,17 @@ Client.prototype.makeUnaryRequest = function(method, serialize, deserialize, * Make a client stream request to the given method, using the given serialize * and deserialize functions, with the given argument. * @param {string} method The name of the method to request - * @param {module:src/common~serialize} serialize The serialization function for + * @param {grpc~serialize} serialize The serialization function for * inputs - * @param {module:src/common~deserialize} deserialize The deserialization + * @param {grpc~deserialize} deserialize The deserialization * function for outputs - * @param {module:src/metadata.Metadata=} metadata Array of metadata key/value - * pairs to add to the call - * @param {module:src/client~CallOptions=} options Options map - * @param {Client~requestCallback} callback The callback to for when the + * @param {grpc.Metadata=} metadata Array of metadata key/value pairs to add to + * the call + * @param {grpc.Client~CallOptions=} options Options map + * @param {grpc.Client~requestCallback} callback The callback to for when the * response is received - * @return {module:src/client~ClientWritableStream} An event emitter for stream - * related events + * @return {grpc~ClientWritableStream} An event emitter for stream related + * events */ Client.prototype.makeClientStreamRequest = function(method, serialize, deserialize, metadata, @@ -631,17 +663,16 @@ Client.prototype.makeClientStreamRequest = function(method, serialize, * Make a server stream request to the given method, with the given serialize * and deserialize function, using the given argument * @param {string} method The name of the method to request - * @param {module:src/common~serialize} serialize The serialization function for - * inputs - * @param {module:src/common~deserialize} deserialize The deserialization + * @param {grpc~serialize} serialize The serialization function for inputs + * @param {grpc~deserialize} deserialize The deserialization * function for outputs * @param {*} argument The argument to the call. Should be serializable with * serialize - * @param {module:src/metadata.Metadata=} metadata Array of metadata key/value - * pairs to add to the call - * @param {module:src/client~CallOptions=} options Options map - * @return {module:src/client~ClientReadableStream} An event emitter for stream - * related events + * @param {grpc.Metadata=} metadata Array of metadata key/value pairs to add to + * the call + * @param {grpc.Client~CallOptions=} options Options map + * @return {grpc~ClientReadableStream} An event emitter for stream related + * events */ Client.prototype.makeServerStreamRequest = function(method, serialize, deserialize, argument, @@ -693,15 +724,13 @@ Client.prototype.makeServerStreamRequest = function(method, serialize, /** * Make a bidirectional stream request with this method on the given channel. * @param {string} method The name of the method to request - * @param {module:src/common~serialize} serialize The serialization function for - * inputs - * @param {module:src/common~deserialize} deserialize The deserialization + * @param {grpc~serialize} serialize The serialization function for inputs + * @param {grpc~deserialize} deserialize The deserialization * function for outputs - * @param {module:src/metadata.Metadata=} metadata Array of metadata key/value + * @param {grpc.Metadata=} metadata Array of metadata key/value * pairs to add to the call - * @param {module:src/client~CallOptions=} options Options map - * @return {module:src/client~ClientDuplexStream} An event emitter for stream - * related events + * @param {grpc.Client~CallOptions=} options Options map + * @return {grpc~ClientDuplexStream} An event emitter for stream related events */ Client.prototype.makeBidiStreamRequest = function(method, serialize, deserialize, metadata, @@ -743,6 +772,9 @@ Client.prototype.makeBidiStreamRequest = function(method, serialize, return stream; }; +/** + * Close this client. + */ Client.prototype.close = function() { this.$channel.close(); }; @@ -761,8 +793,7 @@ Client.prototype.getChannel = function() { * with an error if the attempt to connect to the server has unrecoverablly * failed or if the deadline expires. This function will make the channel * start connecting if it has not already done so. - * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass - * Infinity to wait forever. + * @param {grpc~Deadline} deadline When to stop waiting for a connection. * @param {function(Error)} callback The callback to call when done attempting * to connect. */ @@ -788,7 +819,7 @@ Client.prototype.waitForReady = function(deadline, callback) { /** * Map with short names for each of the requester maker functions. Used in * makeClientConstructor - * @access private + * @private */ var requester_funcs = { unary: Client.prototype.makeUnaryRequest, @@ -834,9 +865,15 @@ var deprecated_request_wrap = { /** * Creates a constructor for a client with the given methods, as specified in - * the methods argument. - * @param {module:src/common~ServiceDefinition} methods An object mapping - * method names to method attributes + * the methods argument. The resulting class will have an instance method for + * each method in the service, which is a partial application of one of the + * [Client]{@link grpc.Client} request methods, depending on `requestSerialize` + * and `responseSerialize`, with the `method`, `serialize`, and `deserialize` + * arguments predefined. + * @memberof grpc + * @alias grpc~makeGenericClientConstructor + * @param {grpc~ServiceDefinition} methods An object mapping method names to + * method attributes * @param {string} serviceName The fully qualified name of the service * @param {Object} class_options An options object. * @param {boolean=} [class_options.deprecatedArgumentOrder=false] Indicates @@ -844,9 +881,8 @@ var deprecated_request_wrap = { * arguments at the end instead of the callback at the end. This option * is only a temporary stopgap measure to smooth an API breakage. * It is deprecated, and new code should not use it. - * @return {function(string, Object)} New client constructor, which is a - * subclass of [grpc.Client]{@link module:src/client.Client}, and has the - * same arguments as that constructor. + * @return {function} New client constructor, which is a subclass of + * {@link grpc.Client}, and has the same arguments as that constructor. */ exports.makeClientConstructor = function(methods, serviceName, class_options) { @@ -898,8 +934,11 @@ exports.makeClientConstructor = function(methods, serviceName, /** * Return the underlying channel object for the specified client + * @memberof grpc + * @alias grpc~getClientChannel * @param {Client} client * @return {Channel} The channel + * @see grpc.Client#getChannel */ exports.getClientChannel = function(client) { return Client.prototype.getChannel.call(client); @@ -911,22 +950,15 @@ exports.getClientChannel = function(client) { * with an error if the attempt to connect to the server has unrecoverablly * failed or if the deadline expires. This function will make the channel * start connecting if it has not already done so. + * @memberof grpc + * @alias grpc~waitForClientReady * @param {Client} client The client to wait on - * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass + * @param {grpc~Deadline} deadline When to stop waiting for a connection. Pass * Infinity to wait forever. * @param {function(Error)} callback The callback to call when done attempting * to connect. + * @see grpc.Client#waitForReady */ exports.waitForClientReady = function(client, deadline, callback) { Client.prototype.waitForReady.call(client, deadline, callback); }; - -/** - * Map of status code names to status codes - */ -exports.status = constants.status; - -/** - * See docs for client.callError - */ -exports.callError = grpc.callError; diff --git a/src/node/src/common.js b/src/node/src/common.js index 4dad60e630..0f835317ea 100644 --- a/src/node/src/common.js +++ b/src/node/src/common.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2015, Google Inc. * All rights reserved. * @@ -31,12 +31,6 @@ * */ -/** - * This module contains functions that are common to client and server - * code. None of them should be used directly by gRPC users. - * @module - */ - 'use strict'; var _ = require('lodash'); @@ -62,16 +56,19 @@ exports.wrapIgnoreNull = function wrapIgnoreNull(func) { /** * The logger object for the gRPC module. Defaults to console. + * @private */ exports.logger = console; /** * The current logging verbosity. 0 corresponds to logging everything + * @private */ exports.logVerbosity = 0; /** * Log a message if the severity is at least as high as the current verbosity + * @private * @param {Number} severity A value of the grpc.logVerbosity map * @param {String} message The message to log */ @@ -83,6 +80,7 @@ exports.log = function log(severity, message) { /** * Default options for loading proto files into gRPC + * @alias grpc~defaultLoadOptions */ exports.defaultGrpcOptions = { convertFieldsToCamelCase: false, @@ -95,6 +93,30 @@ exports.defaultGrpcOptions = { // JSDoc definitions that are used in multiple other modules /** + * Represents the status of a completed request. If `code` is + * {@link grpc.status}.OK, then the request has completed successfully. + * Otherwise, the request has failed, `details` will contain a description of + * the error. Either way, `metadata` contains the trailing response metadata + * sent by the server when it finishes processing the call. + * @typedef {object} grpc~StatusObject + * @property {number} code The error code, a key of {@link grpc.status} + * @property {string} details Human-readable description of the status + * @property {grpc.Metadata} metadata Trailing metadata sent with the status, + * if applicable + */ + +/** + * Describes how a request has failed. The member `message` will be the same as + * `details` in {@link grpc~StatusObject}, and `code` and `metadata` are the + * same as in that object. + * @typedef {Error} grpc~ServiceError + * @property {number} code The error code, a key of {@link grpc.status} that is + * not `grpc.status.OK` + * @property {grpc.Metadata} metadata Trailing metadata sent with the status, + * if applicable + */ + +/** * The EventEmitter class in the event standard module * @external EventEmitter * @see https://nodejs.org/api/events.html#events_class_eventemitter @@ -120,38 +142,46 @@ exports.defaultGrpcOptions = { /** * A serialization function - * @callback module:src/common~serialize + * @callback grpc~serialize * @param {*} value The value to serialize * @return {Buffer} The value serialized as a byte sequence */ /** * A deserialization function - * @callback module:src/common~deserialize + * @callback grpc~deserialize * @param {Buffer} data The byte sequence to deserialize * @return {*} The data deserialized as a value */ /** + * The deadline of an operation. If it is a date, the deadline is reached at + * the date and time specified. If it is a finite number, it is treated as + * a number of milliseconds since the Unix Epoch. If it is Infinity, the + * deadline will never be reached. If it is -Infinity, the deadline has already + * passed. + * @typedef {(number|date)} grpc~Deadline + */ + +/** * An object that completely defines a service method signature. - * @typedef {Object} module:src/common~MethodDefinition + * @typedef {Object} grpc~MethodDefinition * @property {string} path The method's URL path * @property {boolean} requestStream Indicates whether the method accepts * a stream of requests * @property {boolean} responseStream Indicates whether the method returns * a stream of responses - * @property {module:src/common~serialize} requestSerialize Serialization + * @property {grpc~serialize} requestSerialize Serialization * function for request values - * @property {module:src/common~serialize} responseSerialize Serialization + * @property {grpc~serialize} responseSerialize Serialization * function for response values - * @property {module:src/common~deserialize} requestDeserialize Deserialization + * @property {grpc~deserialize} requestDeserialize Deserialization * function for request data - * @property {module:src/common~deserialize} responseDeserialize Deserialization + * @property {grpc~deserialize} responseDeserialize Deserialization * function for repsonse data */ /** * An object that completely defines a service. - * @typedef {Object.<string, module:src/common~MethodDefinition>} - * module:src/common~ServiceDefinition + * @typedef {Object.<string, grpc~MethodDefinition>} grpc~ServiceDefinition */ diff --git a/src/node/src/constants.js b/src/node/src/constants.js index 528dab120e..c441ee740b 100644 --- a/src/node/src/constants.js +++ b/src/node/src/constants.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2017, Google Inc. * All rights reserved. * @@ -31,16 +31,14 @@ * */ -/** - * @module - */ - /* The comments about status codes are copied verbatim (with some formatting * modifications) from include/grpc/impl/codegen/status.h, for the purpose of * including them in generated documentation. */ /** * Enum of status codes that gRPC can return + * @memberof grpc + * @alias grpc.status * @readonly * @enum {number} */ @@ -178,6 +176,8 @@ exports.status = { * Users are encouraged to write propagation masks as deltas from the default. * i.e. write `grpc.propagate.DEFAULTS & ~grpc.propagate.DEADLINE` to disable * deadline propagation. + * @memberof grpc + * @alias grpc.propagate * @enum {number} */ exports.propagate = { @@ -194,9 +194,11 @@ exports.propagate = { /** * Call error constants. Call errors almost always indicate bugs in the gRPC * library, and these error codes are mainly useful for finding those bugs. + * @memberof grpc + * @readonly * @enum {number} */ -exports.callError = { +const callError = { OK: 0, ERROR: 1, NOT_ON_SERVER: 2, @@ -213,9 +215,14 @@ exports.callError = { PAYLOAD_TYPE_MISMATCH: 14 }; +exports.callError = callError; + /** * Write flags: these can be bitwise or-ed to form write options that modify * how data is written. + * @memberof grpc + * @alias grpc.writeFlags + * @readonly * @enum {number} */ exports.writeFlags = { @@ -232,6 +239,9 @@ exports.writeFlags = { }; /** + * @memberof grpc + * @alias grpc.logVerbosity + * @readonly * @enum {number} */ exports.logVerbosity = { diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js index b1e86bbd09..b1dbc1c450 100644 --- a/src/node/src/credentials.js +++ b/src/node/src/credentials.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2015, Google Inc. * All rights reserved. * @@ -48,6 +48,7 @@ * For example, to create a client secured with SSL that uses Google * default application credentials to authenticate: * + * @example * var channel_creds = credentials.createSsl(root_certs); * (new GoogleAuth()).getApplicationDefault(function(err, credential) { * var call_creds = credentials.createFromGoogleCredential(credential); @@ -56,15 +57,25 @@ * var client = new Client(address, combined_creds); * }); * - * @module + * @namespace grpc.credentials */ 'use strict'; var grpc = require('./grpc_extension'); +/** + * This cannot be constructed directly. Instead, instances of this class should + * be created using the factory functions in {@link grpc.credentials} + * @constructor grpc.credentials~CallCredentials + */ var CallCredentials = grpc.CallCredentials; +/** + * This cannot be constructed directly. Instead, instances of this class should + * be created using the factory functions in {@link grpc.credentials} + * @constructor grpc.credentials~ChannelCredentials + */ var ChannelCredentials = grpc.ChannelCredentials; var Metadata = require('./metadata.js'); @@ -76,24 +87,48 @@ var constants = require('./constants'); var _ = require('lodash'); /** + * @external GoogleCredential + * @see https://github.com/google/google-auth-library-nodejs + */ + +/** * Create an SSL Credentials object. If using a client-side certificate, both * the second and third arguments must be passed. + * @memberof grpc.credentials + * @alias grpc.credentials.createSsl + * @kind function * @param {Buffer} root_certs The root certificate data * @param {Buffer=} private_key The client certificate private key, if * applicable * @param {Buffer=} cert_chain The client certificate cert chain, if applicable - * @return {ChannelCredentials} The SSL Credentials object + * @return {grpc.credentials.ChannelCredentials} The SSL Credentials object */ exports.createSsl = ChannelCredentials.createSsl; /** + * @callback grpc.credentials~metadataCallback + * @param {Error} error The error, if getting metadata failed + * @param {grpc.Metadata} metadata The metadata + */ + +/** + * @callback grpc.credentials~generateMetadata + * @param {Object} params Parameters that can modify metadata generation + * @param {string} params.service_url The URL of the service that the call is + * going to + * @param {grpc.credentials~metadataCallback} callback + */ + +/** * Create a gRPC credentials object from a metadata generation function. This * function gets the service URL and a callback as parameters. The error * passed to the callback can optionally have a 'code' value attached to it, * which corresponds to a status code that this library uses. - * @param {function(String, function(Error, Metadata))} metadata_generator The - * function that generates metadata - * @return {CallCredentials} The credentials object + * @memberof grpc.credentials + * @alias grpc.credentials.createFromMetadataGenerator + * @param {grpc.credentials~generateMetadata} metadata_generator The function + * that generates metadata + * @return {grpc.credentials.CallCredentials} The credentials object */ exports.createFromMetadataGenerator = function(metadata_generator) { return CallCredentials.createFromPlugin(function(service_url, cb_data, @@ -119,8 +154,11 @@ exports.createFromMetadataGenerator = function(metadata_generator) { /** * Create a gRPC credential from a Google credential object. - * @param {Object} google_credential The Google credential object to use - * @return {CallCredentials} The resulting credentials object + * @memberof grpc.credentials + * @alias grpc.credentials.createFromGoogleCredential + * @param {external:GoogleCredential} google_credential The Google credential + * object to use + * @return {grpc.credentials.CallCredentials} The resulting credentials object */ exports.createFromGoogleCredential = function(google_credential) { return exports.createFromMetadataGenerator(function(auth_context, callback) { @@ -141,6 +179,8 @@ exports.createFromGoogleCredential = function(google_credential) { /** * Combine a ChannelCredentials with any number of CallCredentials into a single * ChannelCredentials object. + * @memberof grpc.credentials + * @alias grpc.credentials.combineChannelCredentials * @param {ChannelCredentials} channel_credential The ChannelCredentials to * start with * @param {...CallCredentials} credentials The CallCredentials to compose @@ -157,6 +197,8 @@ exports.combineChannelCredentials = function(channel_credential) { /** * Combine any number of CallCredentials into a single CallCredentials object + * @memberof grpc.credentials + * @alias grpc.credentials.combineCallCredentials * @param {...CallCredentials} credentials the CallCredentials to compose * @return CallCredentials A credentials object that combines all of the input * credentials @@ -172,6 +214,9 @@ exports.combineCallCredentials = function() { /** * Create an insecure credentials object. This is used to create a channel that * does not use SSL. This cannot be composed with anything. + * @memberof grpc.credentials + * @alias grpc.credentials.createInsecure + * @kind function * @return {ChannelCredentials} The insecure credentials object */ exports.createInsecure = ChannelCredentials.createInsecure; diff --git a/src/node/src/grpc_extension.js b/src/node/src/grpc_extension.js index 63a281ddbc..864da97314 100644 --- a/src/node/src/grpc_extension.js +++ b/src/node/src/grpc_extension.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2016, Google Inc. * All rights reserved. * diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js index 92cf23998b..c757d520f8 100644 --- a/src/node/src/metadata.js +++ b/src/node/src/metadata.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2015, Google Inc. * All rights reserved. * @@ -31,15 +31,6 @@ * */ -/** - * Metadata module - * - * This module defines the Metadata class, which represents header and trailer - * metadata for gRPC calls. - * - * @module - */ - 'use strict'; var _ = require('lodash'); @@ -48,8 +39,8 @@ var grpc = require('./grpc_extension'); /** * Class for storing metadata. Keys are normalized to lowercase ASCII. + * @memberof grpc * @constructor - * @alias module:src/metadata.Metadata * @example * var metadata = new metadata_module.Metadata(); * metadata.set('key1', 'value1'); diff --git a/src/node/src/protobuf_js_5_common.js b/src/node/src/protobuf_js_5_common.js index 4041e05390..1663a3a400 100644 --- a/src/node/src/protobuf_js_5_common.js +++ b/src/node/src/protobuf_js_5_common.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2017, Google Inc. * All rights reserved. * @@ -31,6 +31,11 @@ * */ +/** + * @module + * @private + */ + 'use strict'; var _ = require('lodash'); diff --git a/src/node/src/protobuf_js_6_common.js b/src/node/src/protobuf_js_6_common.js index 00f11f2736..91a458aa20 100644 --- a/src/node/src/protobuf_js_6_common.js +++ b/src/node/src/protobuf_js_6_common.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2017, Google Inc. * All rights reserved. * @@ -31,6 +31,11 @@ * */ +/** + * @module + * @private + */ + 'use strict'; var _ = require('lodash'); diff --git a/src/node/src/server.js b/src/node/src/server.js index 1d9cc7d2c1..ae4fcb1dc4 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -1,5 +1,5 @@ -/* - * +/** + * @license * Copyright 2015, Google Inc. * All rights reserved. * @@ -31,22 +31,6 @@ * */ -/** - * Server module - * - * This module contains all the server code for Node gRPC: both the Server - * class itself and the method handler code for all types of methods. - * - * For example, to create a Server, add a service, and start it: - * - * var server = new server_module.Server(); - * server.addProtoService(protobuf_service_descriptor, service_implementation); - * server.bind('address:port', server_credential); - * server.start(); - * - * @module - */ - 'use strict'; var _ = require('lodash'); @@ -70,9 +54,9 @@ var EventEmitter = require('events').EventEmitter; /** * Handle an error on a call by sending it as a status - * @access private - * @param {grpc.Call} call The call to send the error on - * @param {Object} error The error object + * @private + * @param {grpc.internal~Call} call The call to send the error on + * @param {(Object|Error)} error The error object */ function handleError(call, error) { var statusMetadata = new Metadata(); @@ -104,14 +88,14 @@ function handleError(call, error) { /** * Send a response to a unary or client streaming call. - * @access private + * @private * @param {grpc.Call} call The call to respond on * @param {*} value The value to respond with - * @param {function(*):Buffer=} serialize Serialization function for the + * @param {grpc~serialize} serialize Serialization function for the * response - * @param {Metadata=} metadata Optional trailing metadata to send with status - * @param {number=} flags Flags for modifying how the message is sent. - * Defaults to 0. + * @param {grpc.Metadata=} metadata Optional trailing metadata to send with + * status + * @param {number=} [flags=0] Flags for modifying how the message is sent. */ function sendUnaryResponse(call, value, serialize, metadata, flags) { var end_batch = {}; @@ -146,7 +130,7 @@ function sendUnaryResponse(call, value, serialize, metadata, flags) { /** * Initialize a writable stream. This is used for both the writable and duplex * stream constructors. - * @access private + * @private * @param {Writable} stream The stream to set up * @param {function(*):Buffer=} Serialization function for responses */ @@ -225,9 +209,9 @@ function setUpWritable(stream, serialize) { /** * Initialize a readable stream. This is used for both the readable and duplex * stream constructors. - * @access private + * @private * @param {Readable} stream The stream to initialize - * @param {function(Buffer):*=} deserialize Deserialization function for + * @param {grpc~deserialize} deserialize Deserialization function for * incoming data. */ function setUpReadable(stream, deserialize) { @@ -245,34 +229,88 @@ function setUpReadable(stream, deserialize) { }); } +/** + * Emitted when the call has been cancelled. After this has been emitted, the + * call's `cancelled` property will be set to `true`. + * @event grpc~ServerUnaryCall~cancelled + */ + util.inherits(ServerUnaryCall, EventEmitter); -function ServerUnaryCall(call) { +/** + * An EventEmitter. Used for unary calls. + * @constructor grpc~ServerUnaryCall + * @extends external:EventEmitter + * @param {grpc.internal~Call} call The call object associated with the request + * @param {grpc.Metadata} metadata The request metadata from the client + */ +function ServerUnaryCall(call, metadata) { EventEmitter.call(this); this.call = call; + /** + * Indicates if the call has been cancelled + * @member {boolean} grpc~ServerUnaryCall#cancelled + */ + this.cancelled = false; + /** + * The request metadata from the client + * @member {grpc.Metadata} grpc~ServerUnaryCall#metadata + */ + this.metadata = metadata; + /** + * The request message from the client + * @member {*} grpc~ServerUnaryCall#request + */ + this.request = undefined; } +/** + * Emitted when the call has been cancelled. After this has been emitted, the + * call's `cancelled` property will be set to `true`. + * @event grpc~ServerWritableStream~cancelled + */ + util.inherits(ServerWritableStream, Writable); /** * A stream that the server can write to. Used for calls that are streaming from * the server side. - * @constructor - * @param {grpc.Call} call The call object to send data with - * @param {function(*):Buffer=} serialize Serialization function for writes + * @constructor grpc~ServerWritableStream + * @extends external:Writable + * @borrows grpc~ServerUnaryCall#sendMetadata as + * grpc~ServerWritableStream#sendMetadata + * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerWritableStream#getPeer + * @param {grpc.internal~Call} call The call object to send data with + * @param {grpc.Metadata} metadata The request metadata from the client + * @param {grpc~serialize} serialize Serialization function for writes */ -function ServerWritableStream(call, serialize) { +function ServerWritableStream(call, metadata, serialize) { Writable.call(this, {objectMode: true}); this.call = call; this.finished = false; setUpWritable(this, serialize); + /** + * Indicates if the call has been cancelled + * @member {boolean} grpc~ServerWritableStream#cancelled + */ + this.cancelled = false; + /** + * The request metadata from the client + * @member {grpc.Metadata} grpc~ServerWritableStream#metadata + */ + this.metadata = metadata; + /** + * The request message from the client + * @member {*} grpc~ServerWritableStream#request + */ + this.request = undefined; } /** * Start writing a chunk of data. This is an implementation of a method required * for implementing stream.Writable. - * @access private + * @private * @param {Buffer} chunk The chunk of data to write * @param {string} encoding Used to pass write flags * @param {function(Error=)} callback Callback to indicate that the write is @@ -312,19 +350,40 @@ function _write(chunk, encoding, callback) { ServerWritableStream.prototype._write = _write; +/** + * Emitted when the call has been cancelled. After this has been emitted, the + * call's `cancelled` property will be set to `true`. + * @event grpc~ServerReadableStream~cancelled + */ + util.inherits(ServerReadableStream, Readable); /** * A stream that the server can read from. Used for calls that are streaming * from the client side. - * @constructor - * @param {grpc.Call} call The call object to read data with - * @param {function(Buffer):*=} deserialize Deserialization function for reads + * @constructor grpc~ServerReadableStream + * @extends external:Readable + * @borrows grpc~ServerUnaryCall#sendMetadata as + * grpc~ServerReadableStream#sendMetadata + * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerReadableStream#getPeer + * @param {grpc.internal~Call} call The call object to read data with + * @param {grpc.Metadata} metadata The request metadata from the client + * @param {grpc~deserialize} deserialize Deserialization function for reads */ -function ServerReadableStream(call, deserialize) { +function ServerReadableStream(call, metadata, deserialize) { Readable.call(this, {objectMode: true}); this.call = call; setUpReadable(this, deserialize); + /** + * Indicates if the call has been cancelled + * @member {boolean} grpc~ServerReadableStream#cancelled + */ + this.cancelled = false; + /** + * The request metadata from the client + * @member {grpc.Metadata} grpc~ServerReadableStream#metadata + */ + this.metadata = metadata; } /** @@ -381,22 +440,43 @@ function _read(size) { ServerReadableStream.prototype._read = _read; +/** + * Emitted when the call has been cancelled. After this has been emitted, the + * call's `cancelled` property will be set to `true`. + * @event grpc~ServerDuplexStream~cancelled + */ + util.inherits(ServerDuplexStream, Duplex); /** * A stream that the server can read from or write to. Used for calls with * duplex streaming. - * @constructor - * @param {grpc.Call} call Call object to proxy - * @param {function(*):Buffer=} serialize Serialization function for requests - * @param {function(Buffer):*=} deserialize Deserialization function for + * @constructor grpc~ServerDuplexStream + * @extends external:Duplex + * @borrows grpc~ServerUnaryCall#sendMetadata as + * grpc~ServerDuplexStream#sendMetadata + * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerDuplexStream#getPeer + * @param {grpc.internal~Call} call Call object to proxy + * @param {grpc.Metadata} metadata The request metadata from the client + * @param {grpc~serialize} serialize Serialization function for requests + * @param {grpc~deserialize} deserialize Deserialization function for * responses */ -function ServerDuplexStream(call, serialize, deserialize) { +function ServerDuplexStream(call, metadata, serialize, deserialize) { Duplex.call(this, {objectMode: true}); this.call = call; setUpWritable(this, serialize); setUpReadable(this, deserialize); + /** + * Indicates if the call has been cancelled + * @member {boolean} grpc~ServerReadableStream#cancelled + */ + this.cancelled = false; + /** + * The request metadata from the client + * @member {grpc.Metadata} grpc~ServerReadableStream#metadata + */ + this.metadata = metadata; } ServerDuplexStream.prototype._read = _read; @@ -404,6 +484,7 @@ ServerDuplexStream.prototype._write = _write; /** * Send the initial metadata for a writable stream. + * @alias grpc~ServerUnaryCall#sendMetadata * @param {Metadata} responseMetadata Metadata to send */ function sendMetadata(responseMetadata) { @@ -430,6 +511,7 @@ ServerDuplexStream.prototype.sendMetadata = sendMetadata; /** * Get the endpoint this call/stream is connected to. + * @alias grpc~ServerUnaryCall#getPeer * @return {string} The URI of the endpoint */ function getPeer() { @@ -445,6 +527,7 @@ ServerDuplexStream.prototype.getPeer = getPeer; /** * Wait for the client to close, then emit a cancelled event if the client * cancelled. + * @private */ function waitForCancel() { /* jshint validthis: true */ @@ -468,18 +551,41 @@ ServerWritableStream.prototype.waitForCancel = waitForCancel; ServerDuplexStream.prototype.waitForCancel = waitForCancel; /** + * Callback function passed to server handlers that handle methods with unary + * responses. + * @callback grpc.Server~sendUnaryData + * @param {grpc~ServiceError} error An error, if the call failed + * @param {*} value The response value. Must be a valid argument to the + * `responseSerialize` method of the method that is being handled + * @param {grpc.Metadata=} trailer Trailing metadata to send, if applicable + * @param {grpc.writeFlags=} flags Flags to modify writing the response + */ + +/** + * User-provided method to handle unary requests on a server + * @callback grpc.Server~handleUnaryCall + * @param {grpc~ServerUnaryCall} call The call object + * @param {grpc.Server~sendUnaryData} callback The callback to call to respond + * to the request + */ + +/** * Fully handle a unary call - * @access private - * @param {grpc.Call} call The call to handle + * @private + * @param {grpc.internal~Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Metadata} metadata Metadata from the client + * @param {grpc~Server.handleUnaryCall} handler.func The handler function + * @param {grpc~deserialize} handler.deserialize The deserialization function + * for request data + * @param {grpc~serialize} handler.serialize The serialization function for + * response data + * @param {grpc.Metadata} metadata Metadata from the client */ function handleUnary(call, handler, metadata) { - var emitter = new ServerUnaryCall(call); + var emitter = new ServerUnaryCall(call, metadata); emitter.on('error', function(error) { handleError(call, error); }); - emitter.metadata = metadata; emitter.waitForCancel(); var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; @@ -512,16 +618,27 @@ function handleUnary(call, handler, metadata) { } /** + * User provided method to handle server streaming methods on the server. + * @callback grpc.Server~handleServerStreamingCall + * @param {grpc~ServerWritableStream} call The call object + */ + +/** * Fully handle a server streaming call - * @access private - * @param {grpc.Call} call The call to handle + * @private + * @param {grpc.internal~Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Metadata} metadata Metadata from the client + * @param {grpc~Server.handleServerStreamingCall} handler.func The handler + * function + * @param {grpc~deserialize} handler.deserialize The deserialization function + * for request data + * @param {grpc~serialize} handler.serialize The serialization function for + * response data + * @param {grpc.Metadata} metadata Metadata from the client */ function handleServerStreaming(call, handler, metadata) { - var stream = new ServerWritableStream(call, handler.serialize); + var stream = new ServerWritableStream(call, metadata, handler.serialize); stream.waitForCancel(); - stream.metadata = metadata; var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; call.startBatch(batch, function(err, result) { @@ -541,19 +658,32 @@ function handleServerStreaming(call, handler, metadata) { } /** + * User provided method to handle client streaming methods on the server. + * @callback grpc.Server~handleClientStreamingCall + * @param {grpc~ServerReadableStream} call The call object + * @param {grpc.Server~sendUnaryData} callback The callback to call to respond + * to the request + */ + +/** * Fully handle a client streaming call * @access private - * @param {grpc.Call} call The call to handle + * @param {grpc.internal~Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Metadata} metadata Metadata from the client + * @param {grpc~Server.handleClientStreamingCall} handler.func The handler + * function + * @param {grpc~deserialize} handler.deserialize The deserialization function + * for request data + * @param {grpc~serialize} handler.serialize The serialization function for + * response data + * @param {grpc.Metadata} metadata Metadata from the client */ function handleClientStreaming(call, handler, metadata) { - var stream = new ServerReadableStream(call, handler.deserialize); + var stream = new ServerReadableStream(call, metadata, handler.deserialize); stream.on('error', function(error) { handleError(call, error); }); stream.waitForCancel(); - stream.metadata = metadata; handler.func(stream, function(err, value, trailer, flags) { stream.terminate(); if (err) { @@ -568,17 +698,28 @@ function handleClientStreaming(call, handler, metadata) { } /** + * User provided method to handle bidirectional streaming calls on the server. + * @callback grpc.Server~handleBidiStreamingCall + * @param {grpc~ServerDuplexStream} call The call object + */ + +/** * Fully handle a bidirectional streaming call - * @access private - * @param {grpc.Call} call The call to handle + * @private + * @param {grpc.internal~Call} call The call to handle * @param {Object} handler Request handler object for the method that was called + * @param {grpc~Server.handleBidiStreamingCall} handler.func The handler + * function + * @param {grpc~deserialize} handler.deserialize The deserialization function + * for request data + * @param {grpc~serialize} handler.serialize The serialization function for + * response data * @param {Metadata} metadata Metadata from the client */ function handleBidiStreaming(call, handler, metadata) { - var stream = new ServerDuplexStream(call, handler.serialize, + var stream = new ServerDuplexStream(call, metadata, handler.serialize, handler.deserialize); stream.waitForCancel(); - stream.metadata = metadata; handler.func(stream); } @@ -592,96 +733,90 @@ var streamHandlers = { /** * Constructs a server object that stores request handlers and delegates * incoming requests to those handlers + * @memberof grpc * @constructor * @param {Object=} options Options that should be passed to the internal server * implementation + * @example + * var server = new grpc.Server(); + * server.addProtoService(protobuf_service_descriptor, service_implementation); + * server.bind('address:port', server_credential); + * server.start(); */ function Server(options) { this.handlers = {}; - var handlers = this.handlers; var server = new grpc.Server(options); this._server = server; this.started = false; +} + +/** + * Start the server and begin handling requests + */ +Server.prototype.start = function() { + if (this.started) { + throw new Error('Server is already running'); + } + var self = this; + this.started = true; + this._server.start(); /** - * Start the server and begin handling requests - * @this Server + * Handles the SERVER_RPC_NEW event. If there is a handler associated with + * the requested method, use that handler to respond to the request. Then + * wait for the next request + * @param {grpc.internal~Event} event The event to handle with tag + * SERVER_RPC_NEW */ - this.start = function() { - if (this.started) { - throw new Error('Server is already running'); + function handleNewCall(err, event) { + if (err) { + return; } - this.started = true; - server.start(); - /** - * Handles the SERVER_RPC_NEW event. If there is a handler associated with - * the requested method, use that handler to respond to the request. Then - * wait for the next request - * @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW - */ - function handleNewCall(err, event) { - if (err) { - return; - } - var details = event.new_call; - var call = details.call; - var method = details.method; - var metadata = Metadata._fromCoreRepresentation(details.metadata); - if (method === null) { - return; - } - server.requestCall(handleNewCall); - var handler; - if (handlers.hasOwnProperty(method)) { - handler = handlers[method]; - } else { - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { - code: constants.status.UNIMPLEMENTED, - details: '', - metadata: {} - }; - batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - call.startBatch(batch, function() {}); - return; - } - streamHandlers[handler.type](call, handler, metadata); + var details = event.new_call; + var call = details.call; + var method = details.method; + var metadata = Metadata._fromCoreRepresentation(details.metadata); + if (method === null) { + return; } - server.requestCall(handleNewCall); - }; - - /** - * Gracefully shuts down the server. The server will stop receiving new calls, - * and any pending calls will complete. The callback will be called when all - * pending calls have completed and the server is fully shut down. This method - * is idempotent with itself and forceShutdown. - * @param {function()} callback The shutdown complete callback - */ - this.tryShutdown = function(callback) { - server.tryShutdown(callback); - }; + self._server.requestCall(handleNewCall); + var handler; + if (self.handlers.hasOwnProperty(method)) { + handler = self.handlers[method]; + } else { + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: constants.status.UNIMPLEMENTED, + details: '', + metadata: {} + }; + batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + call.startBatch(batch, function() {}); + return; + } + streamHandlers[handler.type](call, handler, metadata); + } + this._server.requestCall(handleNewCall); +}; - /** - * Forcibly shuts down the server. The server will stop receiving new calls - * and cancel all pending calls. When it returns, the server has shut down. - * This method is idempotent with itself and tryShutdown, and it will trigger - * any outstanding tryShutdown callbacks. - */ - this.forceShutdown = function() { - server.forceShutdown(); - }; -} +/** + * Unified type for application handlers for all types of calls + * @typedef {(grpc.Server~handleUnaryCall + * |grpc.Server~handleClientStreamingCall + * |grpc.Server~handleServerStreamingCall + * |grpc.Server~handleBidiStreamingCall)} grpc.Server~handleCall + */ /** * Registers a handler to handle the named method. Fails if there already is * a handler for the given method. Returns true on success * @param {string} name The name of the method that the provided function should * handle/respond to. - * @param {function} handler Function that takes a stream of request values and - * returns a stream of response values - * @param {function(*):Buffer} serialize Serialization function for responses - * @param {function(Buffer):*} deserialize Deserialization function for requests + * @param {grpc.Server~handleCall} handler Function that takes a stream of + * request values and returns a stream of response values + * @param {grpc~serialize} serialize Serialization function for responses + * @param {grpc~deserialize} deserialize Deserialization function for requests * @param {string} type The streaming type of method that this handles * @return {boolean} True if the handler was set. False if a handler was already * set for that name. @@ -700,6 +835,27 @@ Server.prototype.register = function(name, handler, serialize, deserialize, return true; }; +/** + * Gracefully shuts down the server. The server will stop receiving new calls, + * and any pending calls will complete. The callback will be called when all + * pending calls have completed and the server is fully shut down. This method + * is idempotent with itself and forceShutdown. + * @param {function()} callback The shutdown complete callback + */ +Server.prototype.tryShutdown = function(callback) { + this._server.tryShutdown(callback); +}; + +/** + * Forcibly shuts down the server. The server will stop receiving new calls + * and cancel all pending calls. When it returns, the server has shut down. + * This method is idempotent with itself and tryShutdown, and it will trigger + * any outstanding tryShutdown callbacks. + */ +Server.prototype.forceShutdown = function() { + this._server.forceShutdown(); +}; + var unimplementedStatusResponse = { code: constants.status.UNIMPLEMENTED, details: 'The server does not implement this method' @@ -721,13 +877,10 @@ var defaultHandler = { }; /** - * Add a service to the server, with a corresponding implementation. If you are - * generating this from a proto file, you should instead use - * addProtoService. - * @param {Object<String, *>} service The service descriptor, as - * {@link module:src/common.getProtobufServiceAttrs} returns - * @param {Object<String, function>} implementation Map of method names to - * method implementation for the provided service. + * Add a service to the server, with a corresponding implementation. + * @param {grpc~ServiceDefinition} service The service descriptor + * @param {Object<String, grpc.Server~handleCall>} implementation Map of method + * names to method implementation for the provided service. */ Server.prototype.addService = function(service, implementation) { if (!_.isObject(service) || !_.isObject(implementation)) { @@ -788,10 +941,10 @@ var logAddProtoServiceDeprecationOnce = _.once(function() { /** * Add a proto service to the server, with a corresponding implementation - * @deprecated Use grpc.load and Server#addService instead + * @deprecated Use {@link grpc.Server#addService} instead * @param {Protobuf.Reflect.Service} service The proto service descriptor - * @param {Object<String, function>} implementation Map of method names to - * method implementation for the provided service. + * @param {Object<String, grpc.Server~handleCall>} implementation Map of method + * names to method implementation for the provided service. */ Server.prototype.addProtoService = function(service, implementation) { var options; @@ -815,10 +968,11 @@ Server.prototype.addProtoService = function(service, implementation) { }; /** - * Binds the server to the given port, with SSL enabled if creds is given + * Binds the server to the given port, with SSL disabled if creds is an + * insecure credentials object * @param {string} port The port that the server should bind on, in the format * "address:port" - * @param {ServerCredentials=} creds Server credential object to be used for + * @param {grpc.ServerCredentials} creds Server credential object to be used for * SSL. Pass an insecure credentials object for an insecure port. */ Server.prototype.bind = function(port, creds) { @@ -828,7 +982,4 @@ Server.prototype.bind = function(port, creds) { return this._server.addHttp2Port(port, creds); }; -/** - * @see module:src/server~Server - */ exports.Server = Server; diff --git a/src/node/test/common_test.js b/src/node/test/common_test.js index b7c2c6a8d6..db80207e22 100644 --- a/src/node/test/common_test.js +++ b/src/node/test/common_test.js @@ -100,7 +100,6 @@ describe('Proto message long int serialize and deserialize', function() { var longNumDeserialize = deserializeCls(messages_proto.LongValues, num_options); var serialized = longSerialize({int_64: pos_value}); - console.log(longDeserialize(serialized)); assert.strictEqual(typeof longDeserialize(serialized).int_64, 'string'); /* With the longsAsStrings option disabled, long values are represented as * objects with 3 keys: low, high, and unsigned */ diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index d2f0511af2..f8eaf62aaf 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -1110,6 +1110,18 @@ describe('Other conditions', function() { done(); }); }); + it('after the call has fully completed', function(done) { + var peer; + var call = client.unary({error: false}, function(err, data) { + assert.ifError(err); + setImmediate(function() { + assert.strictEqual(peer, call.getPeer()); + done(); + }); + }); + peer = call.getPeer(); + assert.strictEqual(typeof peer, 'string'); + }); }); }); describe('Call propagation', function() { @@ -1322,14 +1334,14 @@ describe('Cancelling surface client', function() { }); it('Should correctly cancel a unary call', function(done) { var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) { - assert.strictEqual(err.code, surface_client.status.CANCELLED); + assert.strictEqual(err.code, grpc.status.CANCELLED); done(); }); call.cancel(); }); it('Should correctly cancel a client stream call', function(done) { var call = client.sum(function(err, resp) { - assert.strictEqual(err.code, surface_client.status.CANCELLED); + assert.strictEqual(err.code, grpc.status.CANCELLED); done(); }); call.cancel(); @@ -1338,7 +1350,7 @@ describe('Cancelling surface client', function() { var call = client.fib({'limit': 5}); call.on('data', function() {}); call.on('error', function(error) { - assert.strictEqual(error.code, surface_client.status.CANCELLED); + assert.strictEqual(error.code, grpc.status.CANCELLED); done(); }); call.cancel(); @@ -1347,9 +1359,22 @@ describe('Cancelling surface client', function() { var call = client.divMany(); call.on('data', function() {}); call.on('error', function(error) { - assert.strictEqual(error.code, surface_client.status.CANCELLED); + assert.strictEqual(error.code, grpc.status.CANCELLED); done(); }); call.cancel(); }); + it('Should be idempotent', function(done) { + var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) { + assert.strictEqual(err.code, grpc.status.CANCELLED); + // Call asynchronously to try cancelling after call is fully completed + setImmediate(function() { + assert.doesNotThrow(function() { + call.cancel(); + }); + done(); + }); + }); + call.cancel(); + }); }); diff --git a/src/node/tools/package.json b/src/node/tools/package.json index a81aa87f4b..542d52d48b 100644 --- a/src/node/tools/package.json +++ b/src/node/tools/package.json @@ -1,6 +1,6 @@ { "name": "grpc-tools", - "version": "1.4.0-dev", + "version": "1.5.0-dev", "author": "Google Inc.", "description": "Tools for developing with gRPC on Node.js", "homepage": "http://www.grpc.io/", diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec index 2f29058b59..711814e7fa 100644 --- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec +++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec @@ -42,7 +42,7 @@ Pod::Spec.new do |s| # exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed # before them. s.name = '!ProtoCompiler-gRPCPlugin' - v = '1.4.0-dev' + v = '1.5.0-dev' s.version = v s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.' s.description = <<-DESC diff --git a/src/objective-c/GRPCClient/private/version.h b/src/objective-c/GRPCClient/private/version.h index c846f4214c..cacbce4644 100644 --- a/src/objective-c/GRPCClient/private/version.h +++ b/src/objective-c/GRPCClient/private/version.h @@ -38,4 +38,4 @@ // `tools/buildgen/generate_projects.sh`. -#define GRPC_OBJC_VERSION_STRING @"1.4.0-dev" +#define GRPC_OBJC_VERSION_STRING @"1.5.0-dev" diff --git a/src/php/composer.json b/src/php/composer.json index a4fba7e4f6..3a97e5fb41 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -2,7 +2,7 @@ "name": "grpc/grpc-dev", "description": "gRPC library for PHP - for Developement use only", "license": "BSD-3-Clause", - "version": "1.4.0", + "version": "1.5.0", "require": { "php": ">=5.5.0", "google/protobuf": "^v3.3.0" diff --git a/src/php/ext/grpc/version.h b/src/php/ext/grpc/version.h index 993ef2de27..303a63ec36 100644 --- a/src/php/ext/grpc/version.h +++ b/src/php/ext/grpc/version.h @@ -35,6 +35,6 @@ #ifndef VERSION_H #define VERSION_H -#define PHP_GRPC_VERSION "1.4.0" +#define PHP_GRPC_VERSION "1.5.0" #endif /* VERSION_H */ diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 4316449ac6..012ed8ec81 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -786,7 +786,7 @@ def _channel_managed_call_management(state): class _ChannelConnectivityState(object): def __init__(self, channel): - self.lock = threading.Lock() + self.lock = threading.RLock() self.channel = channel self.polling = False self.connectivity = None @@ -926,6 +926,11 @@ class Channel(grpc.Channel): self._call_state = _ChannelCallState(self._channel) self._connectivity_state = _ChannelConnectivityState(self._channel) + # TODO(https://github.com/grpc/grpc/issues/9884) + # Temporary work around UNAVAILABLE issues + # Remove this once c-core has retry support + _subscribe(self._connectivity_state, lambda *args: None, None) + def subscribe(self, callback, try_to_connect=None): _subscribe(self._connectivity_state, callback, try_to_connect) diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index a0cb0dd067..3ae2602d20 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.4.0.dev0""" +__version__ = """1.5.0.dev0""" diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 9770301d09..5f8075467d 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -313,6 +313,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/census/grpc_filter.c', 'src/core/ext/census/grpc_plugin.c', 'src/core/ext/census/initialize.c', + 'src/core/ext/census/intrusive_hash_map.c', 'src/core/ext/census/mlog.c', 'src/core/ext/census/operation.c', 'src/core/ext/census/placeholders.c', diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index ea4bc7ba20..f5bd29ff85 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION='1.4.0.dev0' +VERSION='1.5.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index 26aa555e14..26a8301883 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION='1.4.0.dev0' +VERSION='1.5.0.dev0' diff --git a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py index cd896f32c3..0f399f8f8d 100644 --- a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py +++ b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py @@ -152,4 +152,4 @@ def enable_server_reflection(service_names, server, pool=None): pool: DescriptorPool object to use (descriptor_pool.Default() if None). """ reflection_pb2_grpc.add_ServerReflectionServicer_to_server( - ReflectionServicer(service_names), server, pool) + ReflectionServicer(service_names, pool), server) diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index 978d6b4011..f16737df80 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION='1.4.0.dev0' +VERSION='1.5.0.dev0' diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index 5f0b084884..28cf8a8a62 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION='1.4.0.dev0' +VERSION='1.5.0.dev0' diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 9f7587faa6..126e8ac60d 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -32,6 +32,7 @@ "unit._invocation_defects_test.InvocationDefectsTest", "unit._metadata_code_details_test.MetadataCodeDetailsTest", "unit._metadata_test.MetadataTest", + "unit._reconnect_test.ReconnectTest", "unit._resource_exhausted_test.ResourceExhaustedTest", "unit._rpc_test.RPCTest", "unit._sanity._sanity_test.Sanity", diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py new file mode 100644 index 0000000000..6c316476b3 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -0,0 +1,70 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +"""Tests that a channel will reconnect if a connection is dropped""" + +import unittest + +import grpc +from grpc.framework.foundation import logging_pool + +from tests.unit.framework.common import test_constants + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x00\x00\x01' + +_UNARY_UNARY = '/test/UnaryUnary' + + +def _handle_unary_unary(unused_request, unused_servicer_context): + return _RESPONSE + + +class ReconnectTest(unittest.TestCase): + + def test_reconnect(self): + server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + handler = grpc.method_handlers_generic_handler('test', { + 'UnaryUnary': + grpc.unary_unary_rpc_method_handler(_handle_unary_unary) + }) + server = grpc.server(server_pool, (handler,)) + port = server.add_insecure_port('[::]:0') + server.start() + channel = grpc.insecure_channel('localhost:%d' % port) + multi_callable = channel.unary_unary(_UNARY_UNARY) + self.assertEqual(_RESPONSE, multi_callable(_REQUEST)) + server.stop(None) + server = grpc.server(server_pool, (handler,)) + server.add_insecure_port('[::]:{}'.format(port)) + server.start() + self.assertEqual(_RESPONSE, multi_callable(_REQUEST)) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb index d3e5373b0b..bed8c43405 100755 --- a/src/ruby/end2end/channel_closing_driver.rb +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -61,6 +61,11 @@ def main 'channel is closed while connectivity is watched' end + client_exit_code = $CHILD_STATUS + if client_exit_code != 0 + fail "channel closing client failed, exit code #{client_exit_code}" + end + server_runner.stop end diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index 80fb62899e..9910076dba 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -58,6 +58,9 @@ def main 'It likely hangs when ended abruptly' end + # The interrupt in the child process should cause it to + # exit a non-zero status, so don't check it here. + # This test mainly tries to catch deadlock. server_runner.stop end diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb index ee79292119..e73ca76850 100755 --- a/src/ruby/end2end/grpc_class_init_client.rb +++ b/src/ruby/end2end/grpc_class_init_client.rb @@ -34,44 +34,110 @@ require_relative './end2end_common' -def main - grpc_class = '' - OptionParser.new do |opts| - opts.on('--grpc_class=P', String) do |p| - grpc_class = p +def construct_many(test_proc) + thds = [] + 4.times do + thds << Thread.new do + 20.times do + test_proc.call + end end - end.parse! + end + 20.times do + test_proc.call + end + thds.each(&:join) +end + +def run_gc_stress_test(test_proc) + GC.disable + construct_many(test_proc) - test_proc = nil + GC.enable + construct_many(test_proc) + + GC.start(full_mark: true, immediate_sweep: true) + construct_many(test_proc) +end + +def run_concurrency_stress_test(test_proc) + 100.times do + Thread.new do + test_proc.call + end + end + + test_proc.call + + fail 'exception thrown while child thread initing class' +end +# default (no gc_stress and no concurrency_stress) +def run_default_test(test_proc) + thd = Thread.new do + test_proc.call + end + test_proc.call + thd.join +end + +def get_test_proc(grpc_class) case grpc_class when 'channel' - test_proc = proc do + return proc do GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure) end when 'server' - test_proc = proc do + return proc do GRPC::Core::Server.new({}) end when 'channel_credentials' - test_proc = proc do + return proc do GRPC::Core::ChannelCredentials.new end when 'call_credentials' - test_proc = proc do + return proc do GRPC::Core::CallCredentials.new(proc { |noop| noop }) end when 'compression_options' - test_proc = proc do + return proc do GRPC::Core::CompressionOptions.new end else fail "bad --grpc_class=#{grpc_class} param" end +end - th = Thread.new { test_proc.call } - test_proc.call - th.join +def main + grpc_class = '' + stress_test = '' + OptionParser.new do |opts| + opts.on('--grpc_class=P', String) do |p| + grpc_class = p + end + opts.on('--stress_test=P') do |p| + stress_test = p + end + end.parse! + + test_proc = get_test_proc(grpc_class) + + # the different test configs need to be ran + # in separate processes, since each one tests + # clean shutdown in a different way + case stress_test + when 'gc' + p 'run gc stress' + run_gc_stress_test(test_proc) + when 'concurrency' + p 'run concurrency stress' + run_concurrency_stress_test(test_proc) + when '' + p 'run default' + run_default_test(test_proc) + else + fail "bad --stress_test=#{stress_test} param" + end end main diff --git a/src/ruby/end2end/grpc_class_init_driver.rb b/src/ruby/end2end/grpc_class_init_driver.rb index 764d029f14..c65ed547c5 100755 --- a/src/ruby/end2end/grpc_class_init_driver.rb +++ b/src/ruby/end2end/grpc_class_init_driver.rb @@ -38,29 +38,40 @@ def main call_credentials compression_options ) - native_grpc_classes.each do |grpc_class| - STDERR.puts 'start client' - this_dir = File.expand_path(File.dirname(__FILE__)) - client_path = File.join(this_dir, 'grpc_class_init_client.rb') - client_pid = Process.spawn(RbConfig.ruby, - client_path, - "--grpc_class=#{grpc_class}") - begin - Timeout.timeout(10) do - Process.wait(client_pid) + # there is room for false positives in this test, + # do a few runs for each config + 4.times do + native_grpc_classes.each do |grpc_class| + ['', 'gc', 'concurrency'].each do |stress_test_type| + STDERR.puts 'start client' + this_dir = File.expand_path(File.dirname(__FILE__)) + client_path = File.join(this_dir, 'grpc_class_init_client.rb') + client_pid = Process.spawn(RbConfig.ruby, + client_path, + "--grpc_class=#{grpc_class}", + "--stress_test=#{stress_test_type}") + begin + Timeout.timeout(10) do + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout waiting for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. ' \ + 'It likely hangs when the first constructed gRPC object has ' \ + "type: #{grpc_class}" + end + + client_exit_code = $CHILD_STATUS + # concurrency stress test type is expected to exit with a + # non-zero status due to an exception being raised + if client_exit_code != 0 && stress_test_type != 'concurrency' + fail "client failed, exit code #{client_exit_code}" + end end - rescue Timeout::Error - STDERR.puts "timeout waiting for client pid #{client_pid}" - Process.kill('SIGKILL', client_pid) - Process.wait(client_pid) - STDERR.puts 'killed client child' - raise 'Timed out waiting for client process. ' \ - 'It likely hangs when the first constructed gRPC object has ' \ - "type: #{grpc_class}" end - - client_exit_code = $CHILD_STATUS - fail "client failed, exit code #{client_exit_code}" if client_exit_code != 0 end end diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb new file mode 100755 index 0000000000..206ec8e801 --- /dev/null +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -0,0 +1,63 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +Thread.abort_on_exception = true + +include GRPC::Core::ConnectivityStates + +def watch_state(ch) + thd = Thread.new do + state = ch.connectivity_state(false) + fail "non-idle state: #{state}" unless state == IDLE + ch.watch_connectivity_state(IDLE, Time.now + 360) + end + sleep 0.1 + thd.kill +end + +def main + channels = [] + 10.times do + ch = GRPC::Core::Channel.new('dummy_host', + nil, :this_channel_is_insecure) + watch_state(ch) + channels << ch + end + + # checking state should still be safe to call + channels.each do |c| + fail unless c.connectivity_state(false) == FATAL_FAILURE + end +end + +main diff --git a/src/ruby/end2end/sig_int_during_channel_watch_client.rb b/src/ruby/end2end/sig_int_during_channel_watch_client.rb index 389fc5ba33..0c6a374925 100755 --- a/src/ruby/end2end/sig_int_during_channel_watch_client.rb +++ b/src/ruby/end2end/sig_int_during_channel_watch_client.rb @@ -46,6 +46,8 @@ def main end end.parse! + trap('SIGINT') { exit 0 } + thd = Thread.new do child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}", {}, diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb index 670cda0919..79a8c133fa 100755 --- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -63,6 +63,11 @@ def main 'SIGINT is sent while there is an active connectivity_state call' end + client_exit_code = $CHILD_STATUS + if client_exit_code != 0 + fail "sig_int_during_channel_watch_client failed: #{client_exit_code}" + end + server_runner.stop end diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index a802183726..f65388448a 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -68,29 +68,53 @@ static VALUE grpc_rb_cChannel = Qnil; /* Used during the conversion of a hash to channel args during channel setup */ static VALUE grpc_rb_cChannelArgs; +typedef struct bg_watched_channel { + grpc_channel *channel; + // these fields must only be accessed under global_connection_polling_mu + struct bg_watched_channel *next; + int channel_destroyed; + int refcount; +} bg_watched_channel; + /* grpc_rb_channel wraps a grpc_channel. */ typedef struct grpc_rb_channel { VALUE credentials; - /* The actual channel */ - grpc_channel *wrapped; - int request_safe_destroy; - int safe_to_destroy; - grpc_connectivity_state current_connectivity_state; - - int mu_init_done; - int abort_watch_connectivity_state; - gpr_mu channel_mu; - gpr_cv channel_cv; + /* The actual channel (protected in a wrapper to tell when it's safe to + * destroy) */ + bg_watched_channel *bg_wrapped; } grpc_rb_channel; -/* Forward declarations of functions involved in temporary fix to - * https://github.com/grpc/grpc/issues/9941 */ +typedef enum { CONTINUOUS_WATCH, WATCH_STATE_API } watch_state_op_type; + +typedef struct watch_state_op { + watch_state_op_type op_type; + // from event.success + union { + struct { + int success; + // has been called back due to a cq next call + int called_back; + } api_callback_args; + struct { + bg_watched_channel *bg; + } continuous_watch_callback_args; + } op; +} watch_state_op; + +static bg_watched_channel *bg_watched_channel_list_head = NULL; + static void grpc_rb_channel_try_register_connection_polling( - grpc_rb_channel *wrapper); -static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); + bg_watched_channel *bg); static void *wait_until_channel_polling_thread_started_no_gil(void *); static void wait_until_channel_polling_thread_started_unblocking_func(void *); +static void *channel_init_try_register_connection_polling_without_gil( + void *arg); + +typedef struct channel_init_try_register_stack { + grpc_channel *channel; + grpc_rb_channel *wrapper; +} channel_init_try_register_stack; static grpc_completion_queue *channel_polling_cq; static gpr_mu global_connection_polling_mu; @@ -98,6 +122,42 @@ static gpr_cv global_connection_polling_cv; static int abort_channel_polling = 0; static int channel_polling_thread_started = 0; +static int bg_watched_channel_list_lookup(bg_watched_channel *bg); +static bg_watched_channel *bg_watched_channel_list_create_and_add( + grpc_channel *channel); +static void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg); +static void run_poll_channels_loop_unblocking_func(void *arg); + +// Needs to be called under global_connection_polling_mu +static void grpc_rb_channel_watch_connection_state_op_complete( + watch_state_op *op, int success) { + GPR_ASSERT(!op->op.api_callback_args.called_back); + op->op.api_callback_args.called_back = 1; + op->op.api_callback_args.success = success; + // wake up the watch API call thats waiting on this op + gpr_cv_broadcast(&global_connection_polling_cv); +} + +/* Avoids destroying a channel twice. */ +static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { + gpr_mu_lock(&global_connection_polling_mu); + GPR_ASSERT(bg_watched_channel_list_lookup(bg)); + if (!bg->channel_destroyed) { + grpc_channel_destroy(bg->channel); + bg->channel_destroyed = 1; + } + bg->refcount--; + if (bg->refcount == 0) { + bg_watched_channel_list_free_and_remove(bg); + } + gpr_mu_unlock(&global_connection_polling_mu); +} + +static void *channel_safe_destroy_without_gil(void *arg) { + grpc_rb_channel_safe_destroy((bg_watched_channel *)arg); + return NULL; +} + /* Destroys Channel instances. */ static void grpc_rb_channel_free(void *p) { grpc_rb_channel *ch = NULL; @@ -106,14 +166,13 @@ static void grpc_rb_channel_free(void *p) { }; ch = (grpc_rb_channel *)p; - if (ch->wrapped != NULL) { - grpc_rb_channel_safe_destroy(ch); - ch->wrapped = NULL; - } - - if (ch->mu_init_done) { - gpr_mu_destroy(&ch->channel_mu); - gpr_cv_destroy(&ch->channel_cv); + if (ch->bg_wrapped != NULL) { + /* assumption made here: it's ok to directly gpr_mu_lock the global + * connection polling mutex becuse we're in a finalizer, + * and we can count on this thread to not be interrupted or + * yield the gil. */ + grpc_rb_channel_safe_destroy(ch->bg_wrapped); + ch->bg_wrapped = NULL; } xfree(p); @@ -146,7 +205,7 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel", /* Allocates grpc_rb_channel instances. */ static VALUE grpc_rb_channel_alloc(VALUE cls) { grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel); - wrapper->wrapped = NULL; + wrapper->bg_wrapped = NULL; wrapper->credentials = Qnil; return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper); } @@ -168,18 +227,21 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { grpc_channel_credentials *creds = NULL; char *target_chars = NULL; grpc_channel_args args; + channel_init_try_register_stack stack; + int stop_waiting_for_thread_start = 0; MEMZERO(&args, grpc_channel_args, 1); grpc_ruby_once_init(); rb_thread_call_without_gvl( - wait_until_channel_polling_thread_started_no_gil, NULL, - wait_until_channel_polling_thread_started_unblocking_func, NULL); + wait_until_channel_polling_thread_started_no_gil, + &stop_waiting_for_thread_start, + wait_until_channel_polling_thread_started_unblocking_func, + &stop_waiting_for_thread_start); /* "3" == 3 mandatory args */ rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - wrapper->mu_init_done = 0; target_chars = StringValueCStr(target); grpc_rb_hash_convert_to_channel_args(channel_args, &args); if (TYPE(credentials) == T_SYMBOL) { @@ -196,24 +258,11 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { } GPR_ASSERT(ch); - - wrapper->wrapped = ch; - - gpr_mu_init(&wrapper->channel_mu); - gpr_cv_init(&wrapper->channel_cv); - wrapper->mu_init_done = 1; - - gpr_mu_lock(&wrapper->channel_mu); - wrapper->abort_watch_connectivity_state = 0; - wrapper->current_connectivity_state = - grpc_channel_check_connectivity_state(wrapper->wrapped, 0); - wrapper->safe_to_destroy = 0; - wrapper->request_safe_destroy = 0; - - gpr_cv_broadcast(&wrapper->channel_cv); - gpr_mu_unlock(&wrapper->channel_mu); - - grpc_rb_channel_try_register_connection_polling(wrapper); + stack.channel = ch; + stack.wrapper = wrapper; + rb_thread_call_without_gvl( + channel_init_try_register_connection_polling_without_gil, &stack, NULL, + NULL); if (args.args != NULL) { xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */ @@ -224,10 +273,31 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { return Qnil; } rb_ivar_set(self, id_target, target); - wrapper->wrapped = ch; return self; } +typedef struct get_state_stack { + bg_watched_channel *bg; + int try_to_connect; + int out; +} get_state_stack; + +static void *get_state_without_gil(void *arg) { + get_state_stack *stack = (get_state_stack *)arg; + + gpr_mu_lock(&global_connection_polling_mu); + GPR_ASSERT(abort_channel_polling || channel_polling_thread_started); + if (stack->bg->channel_destroyed) { + stack->out = GRPC_CHANNEL_SHUTDOWN; + } else { + stack->out = grpc_channel_check_connectivity_state(stack->bg->channel, + stack->try_to_connect); + } + gpr_mu_unlock(&global_connection_polling_mu); + + return NULL; +} + /* call-seq: ch.connectivity_state -> state @@ -240,59 +310,69 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, VALUE self) { VALUE try_to_connect_param = Qfalse; - int grpc_try_to_connect = 0; grpc_rb_channel *wrapper = NULL; - grpc_channel *ch = NULL; + get_state_stack stack; /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ rb_scan_args(argc, argv, "01", &try_to_connect_param); - grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - ch = wrapper->wrapped; - if (ch == NULL) { + if (wrapper->bg_wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, - grpc_try_to_connect)); + + stack.bg = wrapper->bg_wrapped; + stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0; + rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL); + + return LONG2NUM(stack.out); } typedef struct watch_state_stack { - grpc_rb_channel *wrapper; + grpc_channel *channel; gpr_timespec deadline; int last_state; } watch_state_stack; -static void *watch_channel_state_without_gvl(void *arg) { +static void *wait_for_watch_state_op_complete_without_gvl(void *arg) { watch_state_stack *stack = (watch_state_stack *)arg; - gpr_timespec deadline = stack->deadline; - grpc_rb_channel *wrapper = stack->wrapper; - int last_state = stack->last_state; - void *return_value = (void *)0; + watch_state_op *op = NULL; + void *success = (void *)0; - gpr_mu_lock(&wrapper->channel_mu); - while (wrapper->current_connectivity_state == last_state && - !wrapper->request_safe_destroy && !wrapper->safe_to_destroy && - !wrapper->abort_watch_connectivity_state && - gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); + gpr_mu_lock(&global_connection_polling_mu); + // its unsafe to do a "watch" after "channel polling abort" because the cq has + // been shut down. + if (abort_channel_polling) { + gpr_mu_unlock(&global_connection_polling_mu); + return (void *)0; } - if (wrapper->current_connectivity_state != last_state) { - return_value = (void *)1; + op = gpr_zalloc(sizeof(watch_state_op)); + op->op_type = WATCH_STATE_API; + grpc_channel_watch_connectivity_state(stack->channel, stack->last_state, + stack->deadline, channel_polling_cq, + op); + + while (!op->op.api_callback_args.called_back) { + gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); } - gpr_mu_unlock(&wrapper->channel_mu); + if (op->op.api_callback_args.success) { + success = (void *)1; + } + gpr_free(op); + gpr_mu_unlock(&global_connection_polling_mu); - return return_value; + return success; } - -static void watch_channel_state_unblocking_func(void *arg) { - grpc_rb_channel *wrapper = (grpc_rb_channel *)arg; - gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); - gpr_mu_lock(&wrapper->channel_mu); - wrapper->abort_watch_connectivity_state = 1; - gpr_cv_broadcast(&wrapper->channel_cv); - gpr_mu_unlock(&wrapper->channel_mu); +static void wait_for_watch_state_op_complete_unblocking_func(void *arg) { + bg_watched_channel *bg = (bg_watched_channel *)arg; + gpr_mu_lock(&global_connection_polling_mu); + if (!bg->channel_destroyed) { + grpc_channel_destroy(bg->channel); + bg->channel_destroyed = 1; + } + gpr_mu_unlock(&global_connection_polling_mu); } /* Wait until the channel's connectivity state becomes different from @@ -307,11 +387,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE deadline) { grpc_rb_channel *wrapper = NULL; watch_state_stack stack; - void *out; + void *op_success = 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - if (wrapper->wrapped == NULL) { + if (wrapper->bg_wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } @@ -323,16 +403,15 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, return Qnil; } - stack.wrapper = wrapper; - stack.deadline = grpc_rb_time_timeval(deadline, 0); + stack.channel = wrapper->bg_wrapped->channel; + stack.deadline = grpc_rb_time_timeval(deadline, 0), stack.last_state = NUM2LONG(last_state); - out = - rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, - watch_channel_state_unblocking_func, wrapper); - if (out) { - return Qtrue; - } - return Qfalse; + + op_success = rb_thread_call_without_gvl( + wait_for_watch_state_op_complete_without_gvl, &stack, + wait_for_watch_state_op_complete_unblocking_func, wrapper->bg_wrapped); + + return op_success ? Qtrue : Qfalse; } /* Create a call given a grpc_channel, in order to call method. The request @@ -344,7 +423,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, grpc_rb_channel *wrapper = NULL; grpc_call *call = NULL; grpc_call *parent_call = NULL; - grpc_channel *ch = NULL; grpc_completion_queue *cq = NULL; int flags = GRPC_PROPAGATE_DEFAULTS; grpc_slice method_slice; @@ -366,8 +444,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, cq = grpc_completion_queue_create_for_pluck(NULL); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - ch = wrapper->wrapped; - if (ch == NULL) { + if (wrapper->bg_wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } @@ -375,8 +452,8 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, method_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method)); - call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice, - host_slice_ptr, + call = grpc_channel_create_call(wrapper->bg_wrapped->channel, parent_call, + flags, cq, method_slice, host_slice_ptr, grpc_rb_time_timeval(deadline, /* absolute time */ 0), NULL); @@ -401,15 +478,16 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, } /* Closes the channel, calling it's destroy method */ +/* Note this is an API-level call; a wrapped channel's finalizer doesn't call + * this */ static VALUE grpc_rb_channel_destroy(VALUE self) { grpc_rb_channel *wrapper = NULL; - grpc_channel *ch = NULL; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - ch = wrapper->wrapped; - if (ch != NULL) { - grpc_rb_channel_safe_destroy(wrapper); - wrapper->wrapped = NULL; + if (wrapper->bg_wrapped != NULL) { + rb_thread_call_without_gvl(channel_safe_destroy_without_gil, + wrapper->bg_wrapped, NULL, NULL); + wrapper->bg_wrapped = NULL; } return Qnil; @@ -422,64 +500,110 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { char *target = NULL; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - target = grpc_channel_get_target(wrapper->wrapped); + target = grpc_channel_get_target(wrapper->bg_wrapped->channel); res = rb_str_new2(target); gpr_free(target); return res; } -// Either start polling channel connection state or signal that it's free to -// destroy. -// Not safe to call while a channel's connection state is polled. -static void grpc_rb_channel_try_register_connection_polling( - grpc_rb_channel *wrapper) { - grpc_connectivity_state conn_state; - gpr_timespec sleep_time = gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); - - GPR_ASSERT(wrapper); - GPR_ASSERT(wrapper->wrapped); - gpr_mu_lock(&wrapper->channel_mu); - if (wrapper->request_safe_destroy) { - wrapper->safe_to_destroy = 1; - gpr_cv_broadcast(&wrapper->channel_cv); - gpr_mu_unlock(&wrapper->channel_mu); - return; +/* Needs to be called under global_connection_polling_mu */ +static int bg_watched_channel_list_lookup(bg_watched_channel *target) { + bg_watched_channel *cur = bg_watched_channel_list_head; + + while (cur != NULL) { + if (cur == target) { + return 1; + } + cur = cur->next; } - gpr_mu_lock(&global_connection_polling_mu); - GPR_ASSERT(channel_polling_thread_started || abort_channel_polling); - conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); - if (conn_state != wrapper->current_connectivity_state) { - wrapper->current_connectivity_state = conn_state; - gpr_cv_broadcast(&wrapper->channel_cv); - } - // avoid posting work to the channel polling cq if it's been shutdown - if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_channel_watch_connectivity_state( - wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); - } else { - wrapper->safe_to_destroy = 1; - gpr_cv_broadcast(&wrapper->channel_cv); + return 0; +} + +/* Needs to be called under global_connection_polling_mu */ +static bg_watched_channel *bg_watched_channel_list_create_and_add( + grpc_channel *channel) { + bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel)); + + watched->channel = channel; + watched->next = bg_watched_channel_list_head; + watched->refcount = 1; + bg_watched_channel_list_head = watched; + return watched; +} + +/* Needs to be called under global_connection_polling_mu */ +static void bg_watched_channel_list_free_and_remove( + bg_watched_channel *target) { + bg_watched_channel *bg = NULL; + + GPR_ASSERT(bg_watched_channel_list_lookup(target)); + GPR_ASSERT(target->channel_destroyed && target->refcount == 0); + if (bg_watched_channel_list_head == target) { + bg_watched_channel_list_head = target->next; + gpr_free(target); + return; + } + bg = bg_watched_channel_list_head; + while (bg != NULL && bg->next != NULL) { + if (bg->next == target) { + bg->next = bg->next->next; + gpr_free(target); + return; + } + bg = bg->next; } + GPR_ASSERT(0); +} + +/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push + * it onto the background thread for constant watches. */ +static void *channel_init_try_register_connection_polling_without_gil( + void *arg) { + channel_init_try_register_stack *stack = + (channel_init_try_register_stack *)arg; + + gpr_mu_lock(&global_connection_polling_mu); + stack->wrapper->bg_wrapped = + bg_watched_channel_list_create_and_add(stack->channel); + grpc_rb_channel_try_register_connection_polling(stack->wrapper->bg_wrapped); gpr_mu_unlock(&global_connection_polling_mu); - gpr_mu_unlock(&wrapper->channel_mu); + return NULL; } -// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized -static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { - gpr_mu_lock(&wrapper->channel_mu); - wrapper->request_safe_destroy = 1; +// Needs to be called under global_connection_poolling_mu +static void grpc_rb_channel_try_register_connection_polling( + bg_watched_channel *bg) { + grpc_connectivity_state conn_state; + watch_state_op *op = NULL; - while (!wrapper->safe_to_destroy) { - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, - gpr_inf_future(GPR_CLOCK_REALTIME)); + GPR_ASSERT(channel_polling_thread_started || abort_channel_polling); + + if (bg->refcount == 0) { + GPR_ASSERT(bg->channel_destroyed); + bg_watched_channel_list_free_and_remove(bg); + return; + } + GPR_ASSERT(bg->refcount == 1); + if (bg->channel_destroyed || abort_channel_polling) { + return; } - GPR_ASSERT(wrapper->safe_to_destroy); - gpr_mu_unlock(&wrapper->channel_mu); - grpc_channel_destroy(wrapper->wrapped); + conn_state = grpc_channel_check_connectivity_state(bg->channel, 0); + if (conn_state == GRPC_CHANNEL_SHUTDOWN) { + return; + } + GPR_ASSERT(bg_watched_channel_list_lookup(bg)); + // prevent bg from being free'd by GC while background thread is watching it + bg->refcount++; + + op = gpr_zalloc(sizeof(watch_state_op)); + op->op_type = CONTINUOUS_WATCH; + op->op.continuous_watch_callback_args.bg = bg; + grpc_channel_watch_connectivity_state(bg->channel, conn_state, + gpr_inf_future(GPR_CLOCK_REALTIME), + channel_polling_cq, op); } // Note this loop breaks out with a single call of @@ -490,6 +614,8 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { // early and falls back to current behavior. static void *run_poll_channels_loop_no_gil(void *arg) { grpc_event event; + watch_state_op *op = NULL; + bg_watched_channel *bg = NULL; (void)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin"); @@ -505,10 +631,22 @@ static void *run_poll_channels_loop_no_gil(void *arg) { if (event.type == GRPC_QUEUE_SHUTDOWN) { break; } + gpr_mu_lock(&global_connection_polling_mu); if (event.type == GRPC_OP_COMPLETE) { - grpc_rb_channel_try_register_connection_polling( - (grpc_rb_channel *)event.tag); + op = (watch_state_op *)event.tag; + if (op->op_type == CONTINUOUS_WATCH) { + bg = (bg_watched_channel *)op->op.continuous_watch_callback_args.bg; + bg->refcount--; + grpc_rb_channel_try_register_connection_polling(bg); + gpr_free(op); + } else if (op->op_type == WATCH_STATE_API) { + grpc_rb_channel_watch_connection_state_op_complete( + (watch_state_op *)event.tag, event.success); + } else { + GPR_ASSERT(0); + } } + gpr_mu_unlock(&global_connection_polling_mu); } grpc_completion_queue_destroy(channel_polling_cq); gpr_log(GPR_DEBUG, @@ -519,14 +657,36 @@ static void *run_poll_channels_loop_no_gil(void *arg) { // Notify the channel polling loop to cleanup and shutdown. static void run_poll_channels_loop_unblocking_func(void *arg) { + bg_watched_channel *bg = NULL; (void)arg; + gpr_mu_lock(&global_connection_polling_mu); gpr_log(GPR_DEBUG, - "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting " + "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting " "connection polling"); + // early out after first time through + if (abort_channel_polling) { + gpr_mu_unlock(&global_connection_polling_mu); + return; + } abort_channel_polling = 1; + + // force pending watches to end by switching to shutdown state + bg = bg_watched_channel_list_head; + while (bg != NULL) { + if (!bg->channel_destroyed) { + grpc_channel_destroy(bg->channel); + bg->channel_destroyed = 1; + } + bg = bg->next; + } + grpc_completion_queue_shutdown(channel_polling_cq); + gpr_cv_broadcast(&global_connection_polling_cv); gpr_mu_unlock(&global_connection_polling_mu); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: run_poll_channels_loop_unblocking_func - end aborting " + "connection polling"); } // Poll channel connectivity states in background thread without the GIL. @@ -542,10 +702,11 @@ static VALUE run_poll_channels_loop(VALUE arg) { } static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { - (void)arg; + int *stop_waiting = (int *)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start"); gpr_mu_lock(&global_connection_polling_mu); - while (!channel_polling_thread_started && !abort_channel_polling) { + while (!channel_polling_thread_started && !abort_channel_polling && + !*stop_waiting) { gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } @@ -556,15 +717,22 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { static void wait_until_channel_polling_thread_started_unblocking_func( void *arg) { - (void)arg; + int *stop_waiting = (int *)arg; gpr_mu_lock(&global_connection_polling_mu); gpr_log(GPR_DEBUG, - "GRPC_RUBY: " - "wait_until_channel_polling_thread_started_unblocking_func - begin " - "aborting connection polling"); + "GRPC_RUBY: interrupt wait for channel polling thread to start"); + *stop_waiting = 1; + gpr_cv_broadcast(&global_connection_polling_cv); + gpr_mu_unlock(&global_connection_polling_mu); +} + +static void *set_abort_channel_polling_without_gil(void *arg) { + (void)arg; + gpr_mu_lock(&global_connection_polling_mu); abort_channel_polling = 1; gpr_cv_broadcast(&global_connection_polling_cv); gpr_mu_unlock(&global_connection_polling_mu); + return NULL; } /* Temporary fix for @@ -592,10 +760,8 @@ void grpc_rb_channel_polling_thread_start() { if (!RTEST(background_thread)) { gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread"); - gpr_mu_lock(&global_connection_polling_mu); - abort_channel_polling = 1; - gpr_cv_broadcast(&global_connection_polling_cv); - gpr_mu_unlock(&global_connection_polling_mu); + rb_thread_call_without_gvl(set_abort_channel_polling_without_gil, NULL, + NULL, NULL); } } @@ -674,5 +840,5 @@ void Init_grpc_channel() { grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) { grpc_rb_channel *wrapper = NULL; TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper); - return wrapper->wrapped; + return wrapper->bg_wrapped->channel; } diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c index 9a3b56ddfb..71138265c8 100644 --- a/src/ruby/ext/grpc/rb_event_thread.c +++ b/src/ruby/ext/grpc/rb_event_thread.c @@ -105,16 +105,16 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) { grpc_rb_event *event = NULL; (void)param; gpr_mu_lock(&event_queue.mu); - while ((event = grpc_rb_event_queue_dequeue()) == NULL) { - gpr_cv_wait(&event_queue.cv, &event_queue.mu, - gpr_inf_future(GPR_CLOCK_REALTIME)); - if (event_queue.abort) { + while (!event_queue.abort) { + if ((event = grpc_rb_event_queue_dequeue()) != NULL) { gpr_mu_unlock(&event_queue.mu); - return NULL; + return event; } + gpr_cv_wait(&event_queue.cv, &event_queue.mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&event_queue.mu); - return event; + return NULL; } static void grpc_rb_event_unblocking_func(void *arg) { diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 5be8861e0c..c319cd1391 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -292,11 +292,12 @@ static gpr_once g_once_init = GPR_ONCE_INIT; static void grpc_ruby_once_init_internal() { grpc_init(); - grpc_rb_event_queue_thread_start(); - grpc_rb_channel_polling_thread_start(); atexit(grpc_rb_shutdown); } +static VALUE bg_thread_init_rb_mu = Qundef; +static int bg_thread_init_done = 0; + void grpc_ruby_once_init() { /* ruby_vm_at_exit doesn't seem to be working. It would crash once every * blue moon, and some users are getting it repeatedly. See the discussions @@ -309,6 +310,18 @@ void grpc_ruby_once_init() { * schedule our initialization and destruction only once. */ gpr_once_init(&g_once_init, grpc_ruby_once_init_internal); + + // Avoid calling calling into ruby library (when creating threads here) + // in gpr_once_init. In general, it appears to be unsafe to call + // into the ruby library while holding a non-ruby mutex, because a gil yield + // could end up trying to lock onto that same mutex and deadlocking. + rb_mutex_lock(bg_thread_init_rb_mu); + if (!bg_thread_init_done) { + grpc_rb_event_queue_thread_start(); + grpc_rb_channel_polling_thread_start(); + bg_thread_init_done = 1; + } + rb_mutex_unlock(bg_thread_init_rb_mu); } void Init_grpc_c() { @@ -317,6 +330,9 @@ void Init_grpc_c() { return; } + bg_thread_init_rb_mu = rb_mutex_new(); + rb_global_variable(&bg_thread_init_rb_mu); + grpc_rb_mGRPC = rb_define_module("GRPC"); grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core"); grpc_rb_sNewServerRpc = rb_struct_define( diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 8f76420af6..f6a4ff6795 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -103,6 +103,7 @@ grpc_alarm_create_type grpc_alarm_create_import; grpc_alarm_cancel_type grpc_alarm_cancel_import; grpc_alarm_destroy_type grpc_alarm_destroy_import; grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; +grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import; grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; grpc_channel_create_call_type grpc_channel_create_call_import; grpc_channel_ping_type grpc_channel_ping_import; @@ -405,6 +406,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel"); grpc_alarm_destroy_import = (grpc_alarm_destroy_type) GetProcAddress(library, "grpc_alarm_destroy"); grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state"); + grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers"); grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state"); grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call"); grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 58467f97e8..0d64290b55 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -260,6 +260,9 @@ extern grpc_alarm_destroy_type grpc_alarm_destroy_import; typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel *channel, int try_to_connect); extern grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; #define grpc_channel_check_connectivity_state grpc_channel_check_connectivity_state_import +typedef int(*grpc_channel_num_external_connectivity_watchers_type)(grpc_channel *channel); +extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import; +#define grpc_channel_num_external_connectivity_watchers grpc_channel_num_external_connectivity_watchers_import typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag); extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; #define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index f30dff335f..e2e784d19f 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -29,5 +29,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '1.4.0.dev' + VERSION = '1.5.0.dev' end diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb index 940d68b9b0..c8a7856a09 100644 --- a/src/ruby/spec/channel_connection_spec.rb +++ b/src/ruby/spec/channel_connection_spec.rb @@ -28,6 +28,10 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc' +require 'timeout' + +include Timeout +include GRPC::Core # A test message class EchoMsg @@ -62,7 +66,7 @@ end EchoStub = EchoService.rpc_stub_class def start_server(port = 0) - @srv = GRPC::RpcServer.new + @srv = GRPC::RpcServer.new(pool_size: 1) server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) @srv.handle(EchoService) @server_thd = Thread.new { @srv.run } @@ -138,4 +142,32 @@ describe 'channel connection behavior' do stop_server end + + it 'concurrent watches on the same channel' do + timeout(180) do + port = start_server + ch = GRPC::Core::Channel.new("localhost:#{port}", {}, + :this_channel_is_insecure) + stop_server + + thds = [] + 50.times do + thds << Thread.new do + while ch.connectivity_state(true) != ConnectivityStates::READY + ch.watch_connectivity_state( + ConnectivityStates::READY, Time.now + 60) + break + end + end + end + + sleep 0.01 + + start_server(port) + + thds.each(&:join) + + stop_server + end + end end diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb index 1f8d4afb95..4f74238df7 100644 --- a/src/ruby/tools/version.rb +++ b/src/ruby/tools/version.rb @@ -29,6 +29,6 @@ module GRPC module Tools - VERSION = '1.4.0.dev' + VERSION = '1.5.0.dev' end end |