diff options
author | Alistair Veitch <aveitch@google.com> | 2016-02-02 09:43:02 -0800 |
---|---|---|
committer | Alistair Veitch <aveitch@google.com> | 2016-02-02 09:43:02 -0800 |
commit | 75d5c0f024438944425cf1bd3163b07c6f6e55b5 (patch) | |
tree | 8130c65c6e654bc42187342cb3d8004379422dfa /src/core | |
parent | 7c43f4909242b62881389c7d4cfc9541c70151b6 (diff) |
post merge
Diffstat (limited to 'src/core')
83 files changed, 1213 insertions, 969 deletions
diff --git a/src/core/census/context.c b/src/core/census/context.c index cab58b653c..e60330de64 100644 --- a/src/core/census/context.c +++ b/src/core/census/context.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,16 +31,500 @@ * */ -#include "src/core/census/context.h" - -#include <string.h> #include <grpc/census.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/useful.h> +#include <stdbool.h> +#include <string.h> +#include "src/core/support/string.h" + +// Functions in this file support the public context API, including +// encoding/decoding as part of context propagation across RPC's. The overall +// requirements (in approximate priority order) for the +// context representation: +// 1. Efficient conversion to/from wire format +// 2. Minimal bytes used on-wire +// 3. Efficient context creation +// 4. Efficient lookup of tag value for a key +// 5. Efficient iteration over tags +// 6. Minimal memory footprint +// +// Notes on tradeoffs/decisions: +// * tag includes 1 byte length of key, as well as nil-terminating byte. These +// are to aid in efficient parsing and the ability to directly return key +// strings. This is more important than saving a single byte/tag on the wire. +// * The wire encoding uses only single byte values. This eliminates the need +// to handle endian-ness conversions. It also means there is a hard upper +// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS. +// * Keep all tag information (keys/values/flags) in a single memory buffer, +// that can be directly copied to the wire. +// * Binary tags share the same structure as, but are encoded separately from, +// non-binary tags. This is primarily because non-binary tags are far more +// likely to be repeated across multiple RPC calls, so are more efficiently +// cached and compressed in any metadata schemes. + +// Structure representing a set of tags. Essentially a count of number of tags +// present, and pointer to a chunk of memory that contains the per-tag details. +struct tag_set { + int ntags; // number of tags. + int ntags_alloc; // ntags + number of deleted tags (total number of tags + // in all of kvm). This will always be == ntags, except during the process + // of building a new tag set. + size_t kvm_size; // number of bytes allocated for key/value storage. + size_t kvm_used; // number of bytes of used key/value memory + char *kvm; // key/value memory. Consists of repeated entries of: + // Offset Size Description + // 0 1 Key length, including trailing 0. (K) + // 1 1 Value length. (V) + // 2 1 Flags + // 3 K Key bytes + // 3 + K V Value bytes + // + // We refer to the first 3 entries as the 'tag header'. If extra values are + // introduced in the header, you will need to modify the TAG_HEADER_SIZE + // constant, the raw_tag structure (and everything that uses it) and the + // encode/decode functions appropriately. +}; + +// Number of bytes in tag header. +#define TAG_HEADER_SIZE 3 // key length (1) + value length (1) + flags (1) +// Offsets to tag header entries. +#define KEY_LEN_OFFSET 0 +#define VALUE_LEN_OFFSET 1 +#define FLAG_OFFSET 2 + +// raw_tag represents the raw-storage form of a tag in the kvm of a tag_set. +struct raw_tag { + uint8_t key_len; + uint8_t value_len; + uint8_t flags; + char *key; + char *value; +}; + +// Use a reserved flag bit for indication of deleted tag. +#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED +#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED) + +// Primary (external) representation of a context. Composed of 3 underlying +// tag_set structs, one for each of the binary/printable propagated tags, and +// one for everything else. This is to efficiently support tag +// encoding/decoding. +struct census_context { + struct tag_set tags[3]; + census_context_status status; +}; + +// Indices into the tags member of census_context +#define PROPAGATED_TAGS 0 +#define PROPAGATED_BINARY_TAGS 1 +#define LOCAL_TAGS 2 + +// Extract a raw tag given a pointer (raw) to the tag header. Allow for some +// extra bytes in the tag header (see encode/decode functions for usage: this +// allows for future expansion of the tag header). +static char *decode_tag(struct raw_tag *tag, char *header, int offset) { + tag->key_len = (uint8_t)(*header++); + tag->value_len = (uint8_t)(*header++); + tag->flags = (uint8_t)(*header++); + header += offset; + tag->key = header; + header += tag->key_len; + tag->value = header; + return header + tag->value_len; +} -/* Placeholder implementation only. */ +// Make a copy (in 'to') of an existing tag_set. +static void tag_set_copy(struct tag_set *to, const struct tag_set *from) { + memcpy(to, from, sizeof(struct tag_set)); + to->kvm = gpr_malloc(to->kvm_size); + memcpy(to->kvm, from->kvm, from->kvm_used); +} + +// Delete a tag from a tag_set, if it exists (returns true if it did). +static bool tag_set_delete_tag(struct tag_set *tags, const char *key, + size_t key_len) { + char *kvp = tags->kvm; + for (int i = 0; i < tags->ntags_alloc; i++) { + uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET); + struct raw_tag tag; + kvp = decode_tag(&tag, kvp, 0); + if (CENSUS_TAG_IS_DELETED(tag.flags)) continue; + if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) { + *flags |= CENSUS_TAG_DELETED; + tags->ntags--; + return true; + } + } + return false; +} + +// Delete a tag from a context, return true if it existed. +static bool context_delete_tag(census_context *context, const census_tag *tag, + size_t key_len) { + return ( + tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) || + tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len) || + tag_set_delete_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag->key, + key_len)); +} + +// Add a tag to a tag_set. Return true on success, false if the tag could +// not be added because of constraints on tag set size. This function should +// not be called if the tag may already exist (in a non-deleted state) in +// the tag_set, as that would result in two tags with the same key. +static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, + size_t key_len) { + if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) { + return false; + } + const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE; + if (tags->kvm_used + tag_size > tags->kvm_size) { + // allocate new memory if needed + tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE; + char *new_kvm = gpr_malloc(tags->kvm_size); + memcpy(new_kvm, tags->kvm, tags->kvm_used); + gpr_free(tags->kvm); + tags->kvm = new_kvm; + } + char *kvp = tags->kvm + tags->kvm_used; + *kvp++ = (char)key_len; + *kvp++ = (char)tag->value_len; + // ensure reserved flags are not used. + *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS | + CENSUS_TAG_BINARY)); + memcpy(kvp, tag->key, key_len); + kvp += key_len; + memcpy(kvp, tag->value, tag->value_len); + tags->kvm_used += tag_size; + tags->ntags++; + tags->ntags_alloc++; + return true; +} + +// Add/modify/delete a tag to/in a context. Caller must validate that tag key +// etc. are valid. +static void context_modify_tag(census_context *context, const census_tag *tag, + size_t key_len) { + // First delete the tag if it is already present. + bool deleted = context_delete_tag(context, tag, key_len); + // Determine if we need to add it back. + bool call_add = tag->value != NULL && tag->value_len != 0; + bool added = false; + if (call_add) { + if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { + if (CENSUS_TAG_IS_BINARY(tag->flags)) { + added = tag_set_add_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag, + key_len); + } else { + added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len); + } + } else { + added = tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len); + } + } + if (deleted) { + if (call_add) { + context->status.n_modified_tags++; + } else { + context->status.n_deleted_tags++; + } + } else { + if (added) { + context->status.n_added_tags++; + } else { + context->status.n_ignored_tags++; + } + } +} + +// Remove memory used for deleted tags from a tag set. Basic algorithm: +// 1) Walk through tag set to find first deleted tag. Record where it is. +// 2) Find the next not-deleted tag. Copy all of kvm from there to the end +// "over" the deleted tags +// 3) repeat #1 and #2 until we have seen all tags +// 4) if we are still looking for a not-deleted tag, then all the end portion +// of the kvm is deleted. Just reduce the used amount of memory by the +// appropriate amount. +static void tag_set_flatten(struct tag_set *tags) { + if (tags->ntags == tags->ntags_alloc) return; + bool found_deleted = false; // found a deleted tag. + char *kvp = tags->kvm; + char *dbase = NULL; // record location of deleted tag + for (int i = 0; i < tags->ntags_alloc; i++) { + struct raw_tag tag; + char *next_kvp = decode_tag(&tag, kvp, 0); + if (found_deleted) { + if (!CENSUS_TAG_IS_DELETED(tag.flags)) { + ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags + GPR_ASSERT(reduce > 0); + ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp; + GPR_ASSERT(copy_size > 0); + memmove(dbase, kvp, (size_t)copy_size); + tags->kvm_used -= (size_t)reduce; + next_kvp -= reduce; + found_deleted = false; + } + } else { + if (CENSUS_TAG_IS_DELETED(tag.flags)) { + dbase = kvp; + found_deleted = true; + } + } + kvp = next_kvp; + } + if (found_deleted) { + GPR_ASSERT(dbase > tags->kvm); + tags->kvm_used = (size_t)(dbase - tags->kvm); + } + tags->ntags_alloc = tags->ntags; +} -size_t census_context_serialize(const census_context *context, char *buffer, - size_t buf_size) { - /* TODO(aveitch): implement serialization */ +census_context *census_context_create(const census_context *base, + const census_tag *tags, int ntags, + census_context_status const **status) { + census_context *context = gpr_malloc(sizeof(census_context)); + // If we are given a base, copy it into our new tag set. Otherwise set it + // to zero/NULL everything. + if (base == NULL) { + memset(context, 0, sizeof(census_context)); + } else { + tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]); + tag_set_copy(&context->tags[PROPAGATED_BINARY_TAGS], + &base->tags[PROPAGATED_BINARY_TAGS]); + tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]); + memset(&context->status, 0, sizeof(context->status)); + } + // Walk over the additional tags and, for those that aren't invalid, modify + // the context to add/replace/delete as required. + for (int i = 0; i < ntags; i++) { + const census_tag *tag = &tags[i]; + size_t key_len = strlen(tag->key) + 1; + // ignore the tag if it is too long/short. + if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN && + tag->value_len <= CENSUS_MAX_TAG_KV_LEN) { + context_modify_tag(context, tag, key_len); + } else { + context->status.n_invalid_tags++; + } + } + // Remove any deleted tags, update status if needed, and return. + tag_set_flatten(&context->tags[PROPAGATED_TAGS]); + tag_set_flatten(&context->tags[PROPAGATED_BINARY_TAGS]); + tag_set_flatten(&context->tags[LOCAL_TAGS]); + context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; + context->status.n_propagated_binary_tags = + context->tags[PROPAGATED_BINARY_TAGS].ntags; + context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags; + if (status) { + *status = &context->status; + } + return context; +} + +const census_context_status *census_context_get_status( + const census_context *context) { + return &context->status; +} + +void census_context_destroy(census_context *context) { + gpr_free(context->tags[PROPAGATED_TAGS].kvm); + gpr_free(context->tags[PROPAGATED_BINARY_TAGS].kvm); + gpr_free(context->tags[LOCAL_TAGS].kvm); + gpr_free(context); +} + +void census_context_initialize_iterator(const census_context *context, + census_context_iterator *iterator) { + iterator->context = context; + iterator->index = 0; + if (context->tags[PROPAGATED_TAGS].ntags != 0) { + iterator->base = PROPAGATED_TAGS; + iterator->kvm = context->tags[PROPAGATED_TAGS].kvm; + } else if (context->tags[PROPAGATED_BINARY_TAGS].ntags != 0) { + iterator->base = PROPAGATED_BINARY_TAGS; + iterator->kvm = context->tags[PROPAGATED_BINARY_TAGS].kvm; + } else if (context->tags[LOCAL_TAGS].ntags != 0) { + iterator->base = LOCAL_TAGS; + iterator->kvm = context->tags[LOCAL_TAGS].kvm; + } else { + iterator->base = -1; + } +} + +int census_context_next_tag(census_context_iterator *iterator, + census_tag *tag) { + if (iterator->base < 0) { + return 0; + } + struct raw_tag raw; + iterator->kvm = decode_tag(&raw, iterator->kvm, 0); + tag->key = raw.key; + tag->value = raw.value; + tag->value_len = raw.value_len; + tag->flags = raw.flags; + if (++iterator->index == iterator->context->tags[iterator->base].ntags) { + do { + if (iterator->base == LOCAL_TAGS) { + iterator->base = -1; + return 1; + } + } while (iterator->context->tags[++iterator->base].ntags == 0); + iterator->index = 0; + iterator->kvm = iterator->context->tags[iterator->base].kvm; + } + return 1; +} + +// Find a tag in a tag_set by key. Return true if found, false otherwise. +static bool tag_set_get_tag(const struct tag_set *tags, const char *key, + size_t key_len, census_tag *tag) { + char *kvp = tags->kvm; + for (int i = 0; i < tags->ntags; i++) { + struct raw_tag raw; + kvp = decode_tag(&raw, kvp, 0); + if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) { + tag->key = raw.key; + tag->value = raw.value; + tag->value_len = raw.value_len; + tag->flags = raw.flags; + return true; + } + } + return false; +} + +int census_context_get_tag(const census_context *context, const char *key, + census_tag *tag) { + size_t key_len = strlen(key) + 1; + if (key_len == 1) { + return 0; + } + if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) || + tag_set_get_tag(&context->tags[PROPAGATED_BINARY_TAGS], key, key_len, + tag) || + tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) { + return 1; + } return 0; } + +// Context encoding and decoding functions. +// +// Wire format for tag_set's on the wire: +// +// First, a tag set header: +// +// offset bytes description +// 0 1 version number +// 1 1 number of bytes in this header. This allows for future +// expansion. +// 2 1 number of bytes in each tag header. +// 3 1 ntags value from tag set. +// +// This is followed by the key/value memory from struct tag_set. + +#define ENCODED_VERSION 0 // Version number +#define ENCODED_HEADER_SIZE 4 // size of tag set header + +// Encode a tag set. Returns 0 if buffer is too small. +static size_t tag_set_encode(const struct tag_set *tags, char *buffer, + size_t buf_size) { + if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) { + return 0; + } + buf_size -= ENCODED_HEADER_SIZE; + *buffer++ = (char)ENCODED_VERSION; + *buffer++ = (char)ENCODED_HEADER_SIZE; + *buffer++ = (char)TAG_HEADER_SIZE; + *buffer++ = (char)tags->ntags; + if (tags->ntags == 0) { + return ENCODED_HEADER_SIZE; + } + memcpy(buffer, tags->kvm, tags->kvm_used); + return ENCODED_HEADER_SIZE + tags->kvm_used; +} + +char *census_context_encode(const census_context *context, char *buffer, + size_t buf_size, size_t *print_buf_size, + size_t *bin_buf_size) { + *print_buf_size = + tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size); + if (*print_buf_size == 0) { + return NULL; + } + char *b_buffer = buffer + *print_buf_size; + *bin_buf_size = tag_set_encode(&context->tags[PROPAGATED_BINARY_TAGS], + b_buffer, buf_size - *print_buf_size); + if (*bin_buf_size == 0) { + return NULL; + } + return b_buffer; +} + +// Decode a tag set. +static void tag_set_decode(struct tag_set *tags, const char *buffer, + size_t size) { + uint8_t version = (uint8_t)(*buffer++); + uint8_t header_size = (uint8_t)(*buffer++); + uint8_t tag_header_size = (uint8_t)(*buffer++); + tags->ntags = tags->ntags_alloc = (int)(*buffer++); + if (tags->ntags == 0) { + tags->ntags_alloc = 0; + tags->kvm_size = 0; + tags->kvm_used = 0; + tags->kvm = NULL; + return; + } + if (header_size != ENCODED_HEADER_SIZE) { + GPR_ASSERT(version != ENCODED_VERSION); + GPR_ASSERT(ENCODED_HEADER_SIZE < header_size); + buffer += (header_size - ENCODED_HEADER_SIZE); + } + tags->kvm_used = size - header_size; + tags->kvm_size = tags->kvm_used + CENSUS_MAX_TAG_KV_LEN; + tags->kvm = gpr_malloc(tags->kvm_size); + if (tag_header_size != TAG_HEADER_SIZE) { + // something new in the tag information. I don't understand it, so + // don't copy it over. + GPR_ASSERT(version != ENCODED_VERSION); + GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE); + char *kvp = tags->kvm; + for (int i = 0; i < tags->ntags; i++) { + memcpy(kvp, buffer, TAG_HEADER_SIZE); + kvp += header_size; + struct raw_tag raw; + buffer = + decode_tag(&raw, (char *)buffer, tag_header_size - TAG_HEADER_SIZE); + memcpy(kvp, raw.key, (size_t)raw.key_len + raw.value_len); + kvp += raw.key_len + raw.value_len; + } + } else { + memcpy(tags->kvm, buffer, tags->kvm_used); + } +} + +census_context *census_context_decode(const char *buffer, size_t size, + const char *bin_buffer, size_t bin_size) { + census_context *context = gpr_malloc(sizeof(census_context)); + memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set)); + if (buffer == NULL) { + memset(&context->tags[PROPAGATED_TAGS], 0, sizeof(struct tag_set)); + } else { + tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size); + } + if (bin_buffer == NULL) { + memset(&context->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set)); + } else { + tag_set_decode(&context->tags[PROPAGATED_BINARY_TAGS], bin_buffer, + bin_size); + } + memset(&context->status, 0, sizeof(context->status)); + context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; + context->status.n_propagated_binary_tags = + context->tags[PROPAGATED_BINARY_TAGS].ntags; + // TODO(aveitch): check that BINARY flag is correct for each type. + return context; +} diff --git a/src/core/census/context.h b/src/core/census/context.h deleted file mode 100644 index 700bcf86cf..0000000000 --- a/src/core/census/context.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H -#define GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H - -#include <grpc/census.h> - -#define GRPC_CENSUS_MAX_ON_THE_WIRE_TAG_BYTES 2048 - -/* census_context is the in-memory representation of information needed to - * maintain tracing, RPC statistics and resource usage information. */ -struct census_context { - census_tag_set *tags; /* Opaque data structure for census tags. */ -}; - -#endif /* GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H */ diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index ee1f62f8c2..a8db32b9d5 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -91,7 +91,7 @@ static void client_start_transport_op(grpc_exec_ctx *exec_ctx, } static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, - int success) { + bool success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; diff --git a/src/core/census/initialize.c b/src/core/census/initialize.c index b7af714e0b..ce7ec09b89 100644 --- a/src/core/census/initialize.c +++ b/src/core/census/initialize.c @@ -37,9 +37,7 @@ static int features_enabled = CENSUS_FEATURE_NONE; int census_initialize(int features) { if (features_enabled != CENSUS_FEATURE_NONE) { - return 1; - } - if (features == CENSUS_FEATURE_NONE) { + // Must have been a previous call to census_initialize; return error return 1; } features_enabled = features; diff --git a/src/core/census/placeholders.c b/src/core/census/placeholders.c new file mode 100644 index 0000000000..5829cc9460 --- /dev/null +++ b/src/core/census/placeholders.c @@ -0,0 +1,114 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> + +#include <grpc/support/log.h> + +/* Placeholders for the pending APIs */ + +census_tag_set *census_context_tag_set(census_context *context) { + (void)context; + abort(); +} + +int census_get_trace_record(census_trace_record *trace_record) { + (void)trace_record; + abort(); +} + +void census_record_values(census_context *context, census_value *values, + size_t nvalues) { + (void)context; + (void)values; + (void)nvalues; + abort(); +} + +void census_set_rpc_client_peer(census_context *context, const char *peer) { + (void)context; + (void)peer; + abort(); +} + +void census_trace_scan_end() { abort(); } + +int census_trace_scan_start(int consume) { + (void)consume; + abort(); +} + +const census_aggregation *census_view_aggregrations(const census_view *view) { + (void)view; + abort(); +} + +census_view *census_view_create(uint32_t metric_id, const census_tag_set *tags, + const census_aggregation *aggregations, + size_t naggregations) { + (void)metric_id; + (void)tags; + (void)aggregations; + (void)naggregations; + abort(); +} + +const census_tag_set *census_view_tags(const census_view *view) { + (void)view; + abort(); +} + +void census_view_delete(census_view *view) { + (void)view; + abort(); +} + +const census_view_data *census_view_get_data(const census_view *view) { + (void)view; + abort(); +} + +size_t census_view_metric(const census_view *view) { + (void)view; + abort(); +} + +size_t census_view_naggregations(const census_view *view) { + (void)view; + abort(); +} + +void census_view_reset(census_view *view) { + (void)view; + abort(); +} diff --git a/src/core/census/tag_set.c b/src/core/census/tag_set.c deleted file mode 100644 index 9b65a1dfa1..0000000000 --- a/src/core/census/tag_set.c +++ /dev/null @@ -1,535 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/census.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/port_platform.h> -#include <grpc/support/useful.h> -#include <stdbool.h> -#include <string.h> -#include "src/core/support/string.h" - -// Functions in this file support the public tag_set API, as well as -// encoding/decoding tag_sets as part of context propagation across -// RPC's. The overall requirements (in approximate priority order) for the -// tag_set representations: -// 1. Efficient conversion to/from wire format -// 2. Minimal bytes used on-wire -// 3. Efficient tag set creation -// 4. Efficient lookup of value for a key -// 5. Efficient lookup of value for an index (to support iteration) -// 6. Minimal memory footprint -// -// Notes on tradeoffs/decisions: -// * tag includes 1 byte length of key, as well as nil-terminating byte. These -// are to aid in efficient parsing and the ability to directly return key -// strings. This is more important than saving a single byte/tag on the wire. -// * The wire encoding uses only single byte values. This eliminates the need -// to handle endian-ness conversions. It also means there is a hard upper -// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS. -// * Keep all tag information (keys/values/flags) in a single memory buffer, -// that can be directly copied to the wire.14 -// * Binary tags share the same structure as, but are encoded separately from, -// non-binary tags. This is primarily because non-binary tags are far more -// likely to be repeated across multiple RPC calls, so are more efficiently -// cached and compressed in any metadata schemes. -// * all lengths etc. are restricted to one byte. This eliminates endian -// issues. - -// Structure representing a set of tags. Essentially a count of number of tags -// present, and pointer to a chunk of memory that contains the per-tag details. -struct tag_set { - int ntags; // number of tags. - int ntags_alloc; // ntags + number of deleted tags (total number of tags - // in all of kvm). This will always be == ntags, except during the process - // of building a new tag set. - size_t kvm_size; // number of bytes allocated for key/value storage. - size_t kvm_used; // number of bytes of used key/value memory - char *kvm; // key/value memory. Consists of repeated entries of: - // Offset Size Description - // 0 1 Key length, including trailing 0. (K) - // 1 1 Value length. (V) - // 2 1 Flags - // 3 K Key bytes - // 3 + K V Value bytes - // - // We refer to the first 3 entries as the 'tag header'. If extra values are - // introduced in the header, you will need to modify the TAG_HEADER_SIZE - // constant, the raw_tag structure (and everything that uses it) and the - // encode/decode functions appropriately. -}; - -// Number of bytes in tag header. -#define TAG_HEADER_SIZE 3 // key length (1) + value length (1) + flags (1) -// Offsets to tag header entries. -#define KEY_LEN_OFFSET 0 -#define VALUE_LEN_OFFSET 1 -#define FLAG_OFFSET 2 - -// raw_tag represents the raw-storage form of a tag in the kvm of a tag_set. -struct raw_tag { - uint8_t key_len; - uint8_t value_len; - uint8_t flags; - char *key; - char *value; -}; - -// Use a reserved flag bit for indication of deleted tag. -#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED -#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED) - -// Primary (external) representation of a tag set. Composed of 3 underlying -// tag_set structs, one for each of the binary/printable propagated tags, and -// one for everything else. This is to efficiently support tag -// encoding/decoding. -struct census_tag_set { - struct tag_set tags[3]; - census_tag_set_create_status status; -}; - -// Indices into the tags member of census_tag_set -#define PROPAGATED_TAGS 0 -#define PROPAGATED_BINARY_TAGS 1 -#define LOCAL_TAGS 2 - -// Extract a raw tag given a pointer (raw) to the tag header. Allow for some -// extra bytes in the tag header (see encode/decode functions for usage: this -// allows for future expansion of the tag header). -static char *decode_tag(struct raw_tag *tag, char *header, int offset) { - tag->key_len = (uint8_t)(*header++); - tag->value_len = (uint8_t)(*header++); - tag->flags = (uint8_t)(*header++); - header += offset; - tag->key = header; - header += tag->key_len; - tag->value = header; - return header + tag->value_len; -} - -// Make a copy (in 'to') of an existing tag_set. -static void tag_set_copy(struct tag_set *to, const struct tag_set *from) { - memcpy(to, from, sizeof(struct tag_set)); - to->kvm = gpr_malloc(to->kvm_size); - memcpy(to->kvm, from->kvm, from->kvm_used); -} - -// Delete a tag from a tag_set, if it exists (returns true if it did). -static bool tag_set_delete_tag(struct tag_set *tags, const char *key, - size_t key_len) { - char *kvp = tags->kvm; - for (int i = 0; i < tags->ntags_alloc; i++) { - uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET); - struct raw_tag tag; - kvp = decode_tag(&tag, kvp, 0); - if (CENSUS_TAG_IS_DELETED(tag.flags)) continue; - if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) { - *flags |= CENSUS_TAG_DELETED; - tags->ntags--; - return true; - } - } - return false; -} - -// Delete a tag from a census_tag_set, return true if it existed. -static bool cts_delete_tag(census_tag_set *tags, const census_tag *tag, - size_t key_len) { - return (tag_set_delete_tag(&tags->tags[LOCAL_TAGS], tag->key, key_len) || - tag_set_delete_tag(&tags->tags[PROPAGATED_TAGS], tag->key, key_len) || - tag_set_delete_tag(&tags->tags[PROPAGATED_BINARY_TAGS], tag->key, - key_len)); -} - -// Add a tag to a tag_set. Return true on sucess, false if the tag could -// not be added because of constraints on tag set size. This function should -// not be called if the tag may already exist (in a non-deleted state) in -// the tag_set, as that would result in two tags with the same key. -static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, - size_t key_len) { - if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) { - return false; - } - const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE; - if (tags->kvm_used + tag_size > tags->kvm_size) { - // allocate new memory if needed - tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE; - char *new_kvm = gpr_malloc(tags->kvm_size); - memcpy(new_kvm, tags->kvm, tags->kvm_used); - gpr_free(tags->kvm); - tags->kvm = new_kvm; - } - char *kvp = tags->kvm + tags->kvm_used; - *kvp++ = (char)key_len; - *kvp++ = (char)tag->value_len; - // ensure reserved flags are not used. - *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS | - CENSUS_TAG_BINARY)); - memcpy(kvp, tag->key, key_len); - kvp += key_len; - memcpy(kvp, tag->value, tag->value_len); - tags->kvm_used += tag_size; - tags->ntags++; - tags->ntags_alloc++; - return true; -} - -// Add/modify/delete a tag to/in a census_tag_set. Caller must validate that -// tag key etc. are valid. -static void cts_modify_tag(census_tag_set *tags, const census_tag *tag, - size_t key_len) { - // First delete the tag if it is already present. - bool deleted = cts_delete_tag(tags, tag, key_len); - // Determine if we need to add it back. - bool call_add = tag->value != NULL && tag->value_len != 0; - bool added = false; - if (call_add) { - if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { - if (CENSUS_TAG_IS_BINARY(tag->flags)) { - added = - tag_set_add_tag(&tags->tags[PROPAGATED_BINARY_TAGS], tag, key_len); - } else { - added = tag_set_add_tag(&tags->tags[PROPAGATED_TAGS], tag, key_len); - } - } else { - added = tag_set_add_tag(&tags->tags[LOCAL_TAGS], tag, key_len); - } - } - if (deleted) { - if (call_add) { - tags->status.n_modified_tags++; - } else { - tags->status.n_deleted_tags++; - } - } else { - if (added) { - tags->status.n_added_tags++; - } else { - tags->status.n_ignored_tags++; - } - } -} - -// Remove memory used for deleted tags from the tag set. Basic algorithm: -// 1) Walk through tag set to find first deleted tag. Record where it is. -// 2) Find the next not-deleted tag. Copy all of kvm from there to the end -// "over" the deleted tags -// 3) repeat #1 and #2 until we have seen all tags -// 4) if we are still looking for a not-deleted tag, then all the end portion -// of the kvm is deleted. Just reduce the used amount of memory by the -// appropriate amount. -static void tag_set_flatten(struct tag_set *tags) { - if (tags->ntags == tags->ntags_alloc) return; - bool found_deleted = false; // found a deleted tag. - char *kvp = tags->kvm; - char *dbase = NULL; // record location of deleted tag - for (int i = 0; i < tags->ntags_alloc; i++) { - struct raw_tag tag; - char *next_kvp = decode_tag(&tag, kvp, 0); - if (found_deleted) { - if (!CENSUS_TAG_IS_DELETED(tag.flags)) { - ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags - GPR_ASSERT(reduce > 0); - ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp; - GPR_ASSERT(copy_size > 0); - memmove(dbase, kvp, (size_t)copy_size); - tags->kvm_used -= (size_t)reduce; - next_kvp -= reduce; - found_deleted = false; - } - } else { - if (CENSUS_TAG_IS_DELETED(tag.flags)) { - dbase = kvp; - found_deleted = true; - } - } - kvp = next_kvp; - } - if (found_deleted) { - GPR_ASSERT(dbase > tags->kvm); - tags->kvm_used = (size_t)(dbase - tags->kvm); - } - tags->ntags_alloc = tags->ntags; -} - -census_tag_set *census_tag_set_create( - const census_tag_set *base, const census_tag *tags, int ntags, - census_tag_set_create_status const **status) { - census_tag_set *new_ts = gpr_malloc(sizeof(census_tag_set)); - // If we are given a base, copy it into our new tag set. Otherwise set it - // to zero/NULL everything. - if (base == NULL) { - memset(new_ts, 0, sizeof(census_tag_set)); - } else { - tag_set_copy(&new_ts->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]); - tag_set_copy(&new_ts->tags[PROPAGATED_BINARY_TAGS], - &base->tags[PROPAGATED_BINARY_TAGS]); - tag_set_copy(&new_ts->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]); - memset(&new_ts->status, 0, sizeof(new_ts->status)); - } - // Walk over the additional tags and, for those that aren't invalid, modify - // the tag set to add/replace/delete as required. - for (int i = 0; i < ntags; i++) { - const census_tag *tag = &tags[i]; - size_t key_len = strlen(tag->key) + 1; - // ignore the tag if it is too long/short. - if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN && - tag->value_len <= CENSUS_MAX_TAG_KV_LEN) { - cts_modify_tag(new_ts, tag, key_len); - } else { - new_ts->status.n_invalid_tags++; - } - } - // Remove any deleted tags, update status if needed, and return. - tag_set_flatten(&new_ts->tags[PROPAGATED_TAGS]); - tag_set_flatten(&new_ts->tags[PROPAGATED_BINARY_TAGS]); - tag_set_flatten(&new_ts->tags[LOCAL_TAGS]); - new_ts->status.n_propagated_tags = new_ts->tags[PROPAGATED_TAGS].ntags; - new_ts->status.n_propagated_binary_tags = - new_ts->tags[PROPAGATED_BINARY_TAGS].ntags; - new_ts->status.n_local_tags = new_ts->tags[LOCAL_TAGS].ntags; - if (status) { - *status = &new_ts->status; - } - return new_ts; -} - -const census_tag_set_create_status *census_tag_set_get_create_status( - const census_tag_set *tags) { - return &tags->status; -} - -void census_tag_set_destroy(census_tag_set *tags) { - gpr_free(tags->tags[PROPAGATED_TAGS].kvm); - gpr_free(tags->tags[PROPAGATED_BINARY_TAGS].kvm); - gpr_free(tags->tags[LOCAL_TAGS].kvm); - gpr_free(tags); -} - -// Initialize a tag set iterator. Must be called before first use of the -// iterator. -void census_tag_set_initialize_iterator(const census_tag_set *tags, - census_tag_set_iterator *iterator) { - iterator->tags = tags; - iterator->index = 0; - if (tags->tags[PROPAGATED_TAGS].ntags != 0) { - iterator->base = PROPAGATED_TAGS; - iterator->kvm = tags->tags[PROPAGATED_TAGS].kvm; - } else if (tags->tags[PROPAGATED_BINARY_TAGS].ntags != 0) { - iterator->base = PROPAGATED_BINARY_TAGS; - iterator->kvm = tags->tags[PROPAGATED_BINARY_TAGS].kvm; - } else if (tags->tags[LOCAL_TAGS].ntags != 0) { - iterator->base = LOCAL_TAGS; - iterator->kvm = tags->tags[LOCAL_TAGS].kvm; - } else { - iterator->base = -1; - } -} - -// Get the contents of the "next" tag in the tag set. If there are no more -// tags in the tag set, returns 0 (and 'tag' contents will be unchanged), -// otherwise returns 1. */ -int census_tag_set_next_tag(census_tag_set_iterator *iterator, - census_tag *tag) { - if (iterator->base < 0) { - return 0; - } - struct raw_tag raw; - iterator->kvm = decode_tag(&raw, iterator->kvm, 0); - tag->key = raw.key; - tag->value = raw.value; - tag->value_len = raw.value_len; - tag->flags = raw.flags; - if (++iterator->index == iterator->tags->tags[iterator->base].ntags) { - do { - if (iterator->base == LOCAL_TAGS) { - iterator->base = -1; - return 1; - } - } while (iterator->tags->tags[++iterator->base].ntags == 0); - iterator->index = 0; - iterator->kvm = iterator->tags->tags[iterator->base].kvm; - } - return 1; -} - -// Find a tag in a tag_set by key. Return true if found, false otherwise. -static bool tag_set_get_tag_by_key(const struct tag_set *tags, const char *key, - size_t key_len, census_tag *tag) { - char *kvp = tags->kvm; - for (int i = 0; i < tags->ntags; i++) { - struct raw_tag raw; - kvp = decode_tag(&raw, kvp, 0); - if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) { - tag->key = raw.key; - tag->value = raw.value; - tag->value_len = raw.value_len; - tag->flags = raw.flags; - return true; - } - } - return false; -} - -int census_tag_set_get_tag_by_key(const census_tag_set *tags, const char *key, - census_tag *tag) { - size_t key_len = strlen(key) + 1; - if (key_len == 1) { - return 0; - } - if (tag_set_get_tag_by_key(&tags->tags[PROPAGATED_TAGS], key, key_len, tag) || - tag_set_get_tag_by_key(&tags->tags[PROPAGATED_BINARY_TAGS], key, key_len, - tag) || - tag_set_get_tag_by_key(&tags->tags[LOCAL_TAGS], key, key_len, tag)) { - return 1; - } - return 0; -} - -// tag_set encoding and decoding functions. -// -// Wire format for tag sets on the wire: -// -// First, a tag set header: -// -// offset bytes description -// 0 1 version number -// 1 1 number of bytes in this header. This allows for future -// expansion. -// 2 1 number of bytes in each tag header. -// 3 1 ntags value from tag set. -// -// This is followed by the key/value memory from struct tag_set. - -#define ENCODED_VERSION 0 // Version number -#define ENCODED_HEADER_SIZE 4 // size of tag set header - -// Encode a tag set. Returns 0 if buffer is too small. -static size_t tag_set_encode(const struct tag_set *tags, char *buffer, - size_t buf_size) { - if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) { - return 0; - } - buf_size -= ENCODED_HEADER_SIZE; - *buffer++ = (char)ENCODED_VERSION; - *buffer++ = (char)ENCODED_HEADER_SIZE; - *buffer++ = (char)TAG_HEADER_SIZE; - *buffer++ = (char)tags->ntags; - if (tags->ntags == 0) { - return ENCODED_HEADER_SIZE; - } - memcpy(buffer, tags->kvm, tags->kvm_used); - return ENCODED_HEADER_SIZE + tags->kvm_used; -} - -char *census_tag_set_encode(const census_tag_set *tags, char *buffer, - size_t buf_size, size_t *print_buf_size, - size_t *bin_buf_size) { - *print_buf_size = - tag_set_encode(&tags->tags[PROPAGATED_TAGS], buffer, buf_size); - if (*print_buf_size == 0) { - return NULL; - } - char *b_buffer = buffer + *print_buf_size; - *bin_buf_size = tag_set_encode(&tags->tags[PROPAGATED_BINARY_TAGS], b_buffer, - buf_size - *print_buf_size); - if (*bin_buf_size == 0) { - return NULL; - } - return b_buffer; -} - -// Decode a tag set. -static void tag_set_decode(struct tag_set *tags, const char *buffer, - size_t size) { - uint8_t version = (uint8_t)(*buffer++); - uint8_t header_size = (uint8_t)(*buffer++); - uint8_t tag_header_size = (uint8_t)(*buffer++); - tags->ntags = tags->ntags_alloc = (int)(*buffer++); - if (tags->ntags == 0) { - tags->ntags_alloc = 0; - tags->kvm_size = 0; - tags->kvm_used = 0; - tags->kvm = NULL; - return; - } - if (header_size != ENCODED_HEADER_SIZE) { - GPR_ASSERT(version != ENCODED_VERSION); - GPR_ASSERT(ENCODED_HEADER_SIZE < header_size); - buffer += (header_size - ENCODED_HEADER_SIZE); - } - tags->kvm_used = size - header_size; - tags->kvm_size = tags->kvm_used + CENSUS_MAX_TAG_KV_LEN; - tags->kvm = gpr_malloc(tags->kvm_size); - if (tag_header_size != TAG_HEADER_SIZE) { - // something new in the tag information. I don't understand it, so - // don't copy it over. - GPR_ASSERT(version != ENCODED_VERSION); - GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE); - char *kvp = tags->kvm; - for (int i = 0; i < tags->ntags; i++) { - memcpy(kvp, buffer, TAG_HEADER_SIZE); - kvp += header_size; - struct raw_tag raw; - buffer = - decode_tag(&raw, (char *)buffer, tag_header_size - TAG_HEADER_SIZE); - memcpy(kvp, raw.key, (size_t)raw.key_len + raw.value_len); - kvp += raw.key_len + raw.value_len; - } - } else { - memcpy(tags->kvm, buffer, tags->kvm_used); - } -} - -census_tag_set *census_tag_set_decode(const char *buffer, size_t size, - const char *bin_buffer, size_t bin_size) { - census_tag_set *new_ts = gpr_malloc(sizeof(census_tag_set)); - memset(&new_ts->tags[LOCAL_TAGS], 0, sizeof(struct tag_set)); - if (buffer == NULL) { - memset(&new_ts->tags[PROPAGATED_TAGS], 0, sizeof(struct tag_set)); - } else { - tag_set_decode(&new_ts->tags[PROPAGATED_TAGS], buffer, size); - } - if (bin_buffer == NULL) { - memset(&new_ts->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set)); - } else { - tag_set_decode(&new_ts->tags[PROPAGATED_BINARY_TAGS], bin_buffer, bin_size); - } - memset(&new_ts->status, 0, sizeof(new_ts->status)); - new_ts->status.n_propagated_tags = new_ts->tags[PROPAGATED_TAGS].ntags; - new_ts->status.n_propagated_binary_tags = - new_ts->tags[PROPAGATED_BINARY_TAGS].ntags; - // TODO(aveitch): check that BINARY flag is correct for each type. - return new_ts; -} diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 487db1119a..0427ce0b8d 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,6 +35,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/support/string.h" +#include <grpc/census.h> #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> @@ -119,10 +120,10 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { if (a == NULL) return 0; for (i = 0; i < a->num_args; i++) { if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) { - return a->args[i].value.integer != 0; + return a->args[i].value.integer != 0 && census_enabled(); } } - return 0; + return census_enabled(); } grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index a92a6ecaf2..7176c01b05 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -135,7 +135,7 @@ static void on_lb_policy_state_changed_locked( } static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { lb_policy_connectivity_watcher *w = arg; gpr_mu_lock(&w->chand->mu_config); @@ -161,7 +161,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, } static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; @@ -191,7 +191,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures); + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -249,7 +250,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); GPR_ASSERT(op->set_accept_stream == NULL); if (op->bind_pollset != NULL) { @@ -268,7 +269,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -310,15 +311,15 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); -static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { continue_picking_args *cpa = arg; if (!success) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); } else if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->connected_subchannel, cpa->on_ready)) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1); + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL); } gpr_free(cpa); } @@ -346,7 +347,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); } } gpr_mu_unlock(&chand->mu_config); @@ -497,7 +498,7 @@ typedef struct { } external_connectivity_watcher; static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 2c0b07d8bf..b1e7155773 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -79,7 +79,7 @@ typedef struct client_uchannel_channel_data { typedef grpc_subchannel_call_holder call_data; static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { channel_data *chand = arg; grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, chand->subchannel_connectivity, @@ -105,7 +105,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); GPR_ASSERT(op->set_accept_stream == NULL); GPR_ASSERT(op->bind_pollset == NULL); diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 3de1ef8a35..3e7ca08fd2 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -153,7 +153,7 @@ static void process_send_initial_metadata( static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, int success) { +static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, bool success) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; gpr_slice_buffer_reset_and_unref(&calld->slices); @@ -183,7 +183,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_next_op(exec_ctx, elem, &calld->send_op); } -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, int success) { +static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, bool success) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 65cfb778bb..43eee046b8 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -1,5 +1,5 @@ /* - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -80,7 +80,7 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { return md; } -static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; client_recv_filter_args a; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index a10b105494..bb75323933 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -131,7 +131,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } } -static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (success) { diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 1e8f4ba1b6..3ad9fd9efb 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -43,9 +43,9 @@ #define CANCELLED_CALL ((grpc_subchannel_call *)1) static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder, - int success); + bool success); static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args, - int success); + bool success); static void add_waiting_locked(grpc_subchannel_call_holder *holder, grpc_transport_stream_op *op); @@ -166,7 +166,7 @@ retry: GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); } -static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_subchannel_call_holder *holder = arg; grpc_subchannel_call *call; gpr_mu_lock(&holder->mu); @@ -209,10 +209,11 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true, + NULL); } -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) { +static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) { retry_ops_args *a = args; size_t i; for (i = 0; i < a->nops; i++) { @@ -240,9 +241,10 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { size_t i; for (i = 0; i < holder->waiting_ops_count; i++) { - grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false, + NULL); grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready, - 0); + false, NULL); } holder->waiting_ops_count = 0; } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index e6ddb1a11f..459bbebb68 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -76,7 +76,7 @@ typedef struct { } pick_first_lb_policy; #define GET_SELECTED(p) \ - ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected)) + ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected)) void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; @@ -121,7 +121,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { *pp->target = NULL; grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); pp = next; } @@ -140,7 +140,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -209,7 +209,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, } static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { pick_first_lb_policy *p = arg; size_t i; size_t num_subchannels = p->num_subchannels; @@ -230,7 +230,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, } static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { pick_first_lb_policy *p = arg; grpc_subchannel *selected_subchannel; pending_pick *pp; @@ -268,19 +268,19 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, selected = grpc_subchannel_get_connected_subchannel(selected_subchannel); GPR_ASSERT(selected != NULL); - gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected); GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); - grpc_exec_ctx_enqueue(exec_ctx, - grpc_closure_create(destroy_subchannels, p), 1); + gpr_atm_rel_store(&p->selected, (gpr_atm)selected); + grpc_exec_ctx_enqueue( + exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( @@ -327,7 +327,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, @@ -374,7 +374,7 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (selected) { grpc_connected_subchannel_ping(exec_ctx, selected, closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); } } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index d487456363..b1171c45b0 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -237,7 +237,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); } grpc_connectivity_state_set(exec_ctx, &p->state_tracker, @@ -263,7 +263,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -336,7 +336,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, } static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; @@ -376,7 +376,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -428,7 +428,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } } else { @@ -479,7 +479,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel_ping(exec_ctx, target, closure); } else { gpr_mu_unlock(&p->mu); - grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); } } diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 28ca30b946..376b6b3d76 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -93,7 +93,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -182,7 +182,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; r->published_version = r->resolved_version; } diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 6343259246..68910ad975 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -98,7 +98,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -153,7 +153,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; *r->target_config = cfg; - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; } } diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 4924ca77d6..166738e768 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -420,7 +420,7 @@ static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, if (r->resolved_config != NULL) { grpc_client_config_ref(r->resolved_config); } - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; r->published_version = r->resolved_version; } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 2992da8b79..0a996a1e8b 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -145,7 +145,7 @@ struct grpc_subchannel_call { static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, - int iomgr_success); + bool iomgr_success); #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define REF_REASON reason @@ -175,7 +175,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, */ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { grpc_connected_subchannel *c = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); @@ -198,7 +198,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, */ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { grpc_subchannel *c = arg; gpr_free((void *)c->filters); grpc_channel_args_destroy(c->args); @@ -268,7 +268,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), - 1); + true, NULL); } } @@ -284,9 +284,13 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->connector = connector; grpc_connector_ref(c->connector); c->num_filters = args->filter_count; - c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters); - memcpy((void *)c->filters, args->filters, - sizeof(grpc_channel_filter *) * c->num_filters); + if (c->num_filters > 0) { + c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters); + memcpy((void *)c->filters, args->filters, + sizeof(grpc_channel_filter *) * c->num_filters); + } else { + c->filters = NULL; + } c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); grpc_pollset_set_init(&c->pollset_set); @@ -337,7 +341,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { } static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { @@ -409,7 +413,7 @@ void grpc_connected_subchannel_process_transport_op( } static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, - int iomgr_success) { + bool iomgr_success) { state_watcher *sw = p; grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; @@ -483,7 +487,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; filters = gpr_malloc(sizeof(*filters) * num_filters); - memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters); + if (c->num_filters > 0) { + memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters); + } memcpy((void *)(filters + c->num_filters), c->connecting_result.filters, sizeof(*filters) * c->connecting_result.num_filters); filters[num_filters - 1] = &grpc_connected_channel_filter; @@ -519,7 +525,12 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { } /* publish */ - GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con)); + /* TODO(ctiller): this full barrier seems to clear up a TSAN failure. + I'd have expected the rel_cas below to be enough, but + seemingly it's not. + Re-evaluate if we really need this. */ + gpr_atm_full_barrier(); + GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); c->connecting = 0; /* setup subchannel watching connected subchannel for changes; subchannel ref @@ -583,7 +594,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) { gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); } -static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { +static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); c->have_alarm = 0; @@ -600,7 +611,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { } static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { grpc_subchannel *c = arg; if (c->connecting_result.transport != NULL) { @@ -636,7 +647,7 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { */ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, - int success) { + bool success) { grpc_subchannel_call *c = call; GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index b5cd8d8d2a..71237bb614 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -114,13 +114,13 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, gpr_free(req); } -static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success); +static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success); static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) { grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read); } -static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { internal_request *req = user_data; size_t i; @@ -147,7 +147,7 @@ static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) { do_read(exec_ctx, req); } -static void done_write(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void done_write(grpc_exec_ctx *exec_ctx, void *arg, bool success) { internal_request *req = arg; if (success) { on_written(exec_ctx, req); @@ -175,7 +175,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, start_write(exec_ctx, req); } -static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { internal_request *req = arg; if (!req->ep) { diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index 7646a88ac5..3a96f7385f 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -43,7 +43,7 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, } void grpc_closure_list_add(grpc_closure_list *closure_list, - grpc_closure *closure, int success) { + grpc_closure *closure, bool success) { if (closure == NULL) return; closure->final_data = (success != 0); if (closure_list->head == NULL) { @@ -54,7 +54,7 @@ void grpc_closure_list_add(grpc_closure_list *closure_list, closure_list->tail = closure; } -int grpc_closure_list_empty(grpc_closure_list closure_list) { +bool grpc_closure_list_empty(grpc_closure_list closure_list) { return closure_list.head == NULL; } @@ -77,7 +77,7 @@ typedef struct { grpc_closure wrapper; } wrapped_closure; -static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, bool success) { wrapped_closure *wc = arg; grpc_iomgr_cb_func cb = wc->cb; void *cb_arg = wc->cb_arg; diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h index 98ef91e1db..ea96c19c71 100644 --- a/src/core/iomgr/closure.h +++ b/src/core/iomgr/closure.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H #include <grpc/support/port_platform.h> +#include <stdbool.h> struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -54,7 +55,7 @@ typedef struct grpc_closure_list { * \param success An indication on the state of the iomgr. On false, cleanup * actions should be taken (eg, shutdown). */ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg, - int success); + bool success); /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { @@ -83,13 +84,13 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); /** add \a closure to the end of \a list and set \a closure's success to \a * success */ void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, - int success); + bool success); /** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); /** return whether \a list is empty. */ -int grpc_closure_list_empty(grpc_closure_list list); +bool grpc_closure_list_empty(grpc_closure_list list); /** return the next pointer for a queued closure list */ grpc_closure *grpc_closure_next(grpc_closure *closure); diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index 6059a6031c..1fd79f6eba 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,16 +37,16 @@ #include "src/core/profiling/timers.h" -int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { - int did_something = 0; +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { + bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); while (!grpc_closure_list_empty(exec_ctx->closure_list)) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { - int success = (int)(c->final_data & 1); + bool success = (bool)(c->final_data & 1); grpc_closure *next = (grpc_closure *)(c->final_data & ~(uintptr_t)1); - did_something++; + did_something = true; GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); c->cb(exec_ctx, c->cb_arg, success); GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); @@ -62,11 +62,15 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { } void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - int success) { + bool success, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_add(&exec_ctx->closure_list, closure, success); } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, - grpc_closure_list *list) { + grpc_closure_list *list, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index 43df488094..9a9b2e55fa 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -57,22 +57,29 @@ struct grpc_exec_ctx { grpc_closure_list closure_list; }; +/** A workqueue represents a list of work to be executed asynchronously. + Forward declared here to avoid a circular dependency with workqueue.h. */ +struct grpc_workqueue; +typedef struct grpc_workqueue grpc_workqueue; + #define GRPC_EXEC_CTX_INIT \ { GRPC_CLOSURE_LIST_INIT } /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. - * Returns 1 if work was performed, 0 otherwise. */ -int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); + * Returns true if work was performed, false otherwise. */ +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); /** Add a closure to be executed at the next flush/finish point */ void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - int success); + bool success, + grpc_workqueue *offload_target_or_null); /** Add a list of closures to be executed at the next flush/finish point. * Leaves \a list empty. */ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, - grpc_closure_list *list); + grpc_closure_list *list, + grpc_workqueue *offload_target_or_null); #endif diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c index 00c68f7828..f22d8f30ac 100644 --- a/src/core/iomgr/executor.c +++ b/src/core/iomgr/executor.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -77,7 +77,7 @@ static void closure_exec_thread_func(void *ignored) { gpr_mu_unlock(&g_executor.mu); break; } else { - grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures); + grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL); } gpr_mu_unlock(&g_executor.mu); grpc_exec_ctx_flush(&exec_ctx); @@ -112,7 +112,7 @@ static void maybe_spawn_locked() { g_executor.pending_join = 1; } -void grpc_executor_enqueue(grpc_closure *closure, int success) { +void grpc_executor_enqueue(grpc_closure *closure, bool success) { gpr_mu_lock(&g_executor.mu); if (g_executor.shutting_down == 0) { grpc_closure_list_add(&g_executor.closures, closure, success); @@ -133,7 +133,7 @@ void grpc_executor_shutdown() { * list below because we aren't accepting new work */ /* Execute pending callbacks, some may be performing cleanups */ - grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures); + grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); if (pending_join) { diff --git a/src/core/iomgr/executor.h b/src/core/iomgr/executor.h index 6da446ae9c..aac057ddf5 100644 --- a/src/core/iomgr/executor.h +++ b/src/core/iomgr/executor.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -45,7 +45,7 @@ void grpc_executor_init(); /** Enqueue \a closure for its eventual execution of \a f(arg) on a separate * thread */ -void grpc_executor_enqueue(grpc_closure *closure, int success); +void grpc_executor_enqueue(grpc_closure *closure, bool success); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(); diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 89c938bc04..85eadd754b 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -218,7 +218,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { } else { grpc_remove_fd_from_all_epoll_sets(fd->fd); } - grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); } int grpc_fd_wrapped_fd(grpc_fd *fd) { @@ -273,7 +273,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown); + grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -296,7 +296,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown); + grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); *st = CLOSURE_NOT_READY; return 1; } diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index d3868ce62c..96b6f81024 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -67,7 +67,7 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline, return 0; } timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( + return (DWORD)gpr_time_to_millis(gpr_time_add( timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } @@ -120,7 +120,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { info->has_pending_iocp = 1; } gpr_mu_unlock(&socket->state_mu); - grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); } void grpc_iocp_init(void) { @@ -179,13 +179,11 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) { static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_closure *closure, grpc_winsocket_callback_info *info) { - int run_now = 0; GPR_ASSERT(info->closure == NULL); gpr_mu_lock(&socket->state_mu); if (info->has_pending_iocp) { - run_now = 1; info->has_pending_iocp = 0; - grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); } else { info->closure = closure; } diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index d117485327..4acae2bb71 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -141,7 +141,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_status) { + bool iomgr_status) { delayed_add *da = arg; if (!grpc_fd_is_orphaned(da->fd)) { @@ -154,7 +154,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, /* We don't care about this pollset anymore. */ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { da->pollset->called_shutdown = 1; - grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1); + grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); } } gpr_mu_unlock(&da->pollset->mu); @@ -178,7 +178,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, GRPC_FD_REF(fd, "delayed_add"); grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL); } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index c325b634ae..a8e2e22977 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -243,7 +243,7 @@ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); pollset->vtable->finish_shutdown(pollset); - grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1); + grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -271,7 +271,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!grpc_pollset_has_workers(pollset) && !grpc_closure_list_empty(pollset->idle_jobs)) { GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0); - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); goto done; } /* Check alarms - these are a global resource so we just ping @@ -365,7 +365,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ gpr_mu_lock(&pollset->mu); } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); gpr_mu_unlock(&pollset->mu); grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); @@ -381,7 +381,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutdown_done = closure; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!grpc_pollset_has_workers(pollset)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); } if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && !grpc_pollset_has_workers(pollset)) { @@ -419,7 +419,8 @@ typedef struct grpc_unary_promote_args { grpc_closure promotion_closure; } grpc_unary_promote_args; -static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) { +static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, + bool success) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index deb661548d..2e650ca939 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -81,15 +81,6 @@ static grpc_pollset_worker *pop_front_worker( } } -static void push_back_worker(grpc_pollset_worker *root, - grpc_pollset_worker_link_type type, - grpc_pollset_worker *worker) { - worker->links[type].next = root; - worker->links[type].prev = worker->links[type].next->links[type].prev; - worker->links[type].prev->links[type].next = - worker->links[type].next->links[type].prev = worker; -} - static void push_front_worker(grpc_pollset_worker *root, grpc_pollset_worker_link_type type, grpc_pollset_worker *worker) { @@ -116,7 +107,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { - grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); } else { pollset->on_shutdown = closure; } @@ -174,7 +165,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (pollset->shutting_down && pollset->on_shutdown != NULL) { - grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1); + grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, true, NULL); pollset->on_shutdown = NULL; } goto done; diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index 555c74ce7e..c51745b918 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -152,7 +152,7 @@ done: /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ -static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) { +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { request *r = rp; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 007c855d10..28c8661e73 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -135,7 +135,7 @@ done: /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ -static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) { +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { request *r = rp; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h index fe2be99145..8e3946a7d8 100644 --- a/src/core/iomgr/sockaddr_win32.h +++ b/src/core/iomgr/sockaddr_win32.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,9 +38,4 @@ #include <ws2tcpip.h> #include <mswsock.h> -#ifdef __MINGW32__ -/* mingw seems to be missing that definition. */ -const char *inet_ntop(int af, const void *src, char *dst, socklen_t size); -#endif - #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_WIN32_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index d9d24ee9a3..c76c2e3b0f 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -91,7 +91,7 @@ error: return 0; } -static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int success) { +static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) { int done; async_connect *ac = acp; if (grpc_tcp_trace) { @@ -111,7 +111,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int success) { } } -static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) { +static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; @@ -206,7 +206,7 @@ finish: gpr_free(ac->addr_str); gpr_free(ac); } - grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL); + grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL); } void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, @@ -243,7 +243,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, addr_len = sizeof(addr4_copy); } if (!prepare_socket(addr, fd)) { - grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); return; } @@ -259,14 +259,14 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, if (err >= 0) { *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); - grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); goto done; } diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index e5691b7e12..689c6f7b10 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -74,7 +74,7 @@ static void async_connect_unlock_and_cleanup(async_connect *ac) { } } -static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int occured) { +static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); /* If the alarm didn't occur, it got cancelled. */ @@ -84,7 +84,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int occured) { async_connect_unlock_and_cleanup(ac); } -static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, int from_iocp) { +static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) { async_connect *ac = acp; SOCKET sock = ac->socket->socket; grpc_endpoint **ep = ac->endpoint; @@ -215,7 +215,7 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - grpc_exec_ctx_enqueue(exec_ctx, on_done, 0); + grpc_exec_ctx_enqueue(exec_ctx, on_done, false, NULL); } #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 4fa8ca8c71..048e907441 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -100,9 +100,9 @@ typedef struct { } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - int success); + bool success); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - int success); + bool success); static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -247,7 +247,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - int success) { + bool success) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); @@ -273,7 +273,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = 0; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL); } } @@ -360,7 +360,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { } static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - int success) { + bool success) { grpc_tcp *tcp = (grpc_tcp *)arg; flush_result status; grpc_closure *cb; @@ -407,7 +407,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_enqueue(exec_ctx, cb, 1); + grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL); return; } tcp->outgoing_buffer = buf; @@ -420,7 +420,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE); + grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL); } GPR_TIMER_END("tcp_write", 0); diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index adf14aeb59..5e07f8261c 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -160,7 +160,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL); } gpr_mu_destroy(&s->mu); @@ -174,7 +174,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(s); } -static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) { +static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, + bool success) { grpc_tcp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; @@ -317,7 +318,7 @@ error: } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_tcp_listener *sp = arg; grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, sp->fd_index}; @@ -602,7 +603,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&s->mu); - grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting); + grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL); gpr_mu_unlock(&s->mu); if (exec_ctx == NULL) { grpc_exec_ctx_flush(&local_exec_ctx); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 8ee8149f25..ce930b8f41 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -119,7 +119,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL); } /* Now that the accepts have been aborted, we can destroy the sockets. @@ -173,7 +173,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&s->mu); - grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting); + grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL); gpr_mu_unlock(&s->mu); if (exec_ctx == NULL) { grpc_exec_ctx_flush(&local_exec_ctx); @@ -311,7 +311,7 @@ failure: } /* Event manager callback when reads are ready. */ -static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) { grpc_tcp_listener *sp = arg; grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0}; SOCKET sock = sp->new_socket; @@ -531,7 +531,7 @@ int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index, for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index) ; if (sp) { - return _open_osfhandle(sp->socket->socket, 0); + return _open_osfhandle((intptr_t)sp->socket->socket, 0); } else { return -1; } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index cc7f7ff8d2..038e4158c8 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -138,15 +138,12 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, int success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) { grpc_tcp *tcp = tcpp; grpc_closure *cb = tcp->read_cb; grpc_winsocket *socket = tcp->socket; gpr_slice sub; - gpr_slice *slice = NULL; - size_t nslices = 0; grpc_winsocket_callback_info *info = &socket->read_info; - int do_abort = 0; if (success) { if (socket->read_info.wsa_error != 0 && !tcp->shutting_down) { @@ -187,7 +184,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_exec_ctx_enqueue(exec_ctx, cb, 0); + grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL); return; } @@ -211,7 +208,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transfered = bytes_read; - grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 1); + grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, true, NULL); return; } @@ -224,7 +221,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 0); + grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, false, NULL); return; } } @@ -233,12 +230,11 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, int success) { +static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) { grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; grpc_closure *cb; - int do_abort = 0; gpr_mu_lock(&tcp->mu); cb = tcp->write_cb; @@ -277,7 +273,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_exec_ctx_enqueue(exec_ctx, cb, 0); + grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL); return; } @@ -305,9 +301,9 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, connection that has its send queue filled up. But if we don't, then we can avoid doing an async write operation at all. */ if (info->wsa_error != WSAEWOULDBLOCK) { - int ok = 0; + bool ok = false; if (status == 0) { - ok = 1; + ok = true; GPR_ASSERT(bytes_sent == tcp->write_slices->length); } else { if (socket->read_info.wsa_error != WSAECONNRESET) { @@ -317,7 +313,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } } if (allocated) gpr_free(allocated); - grpc_exec_ctx_enqueue(exec_ctx, cb, ok); + grpc_exec_ctx_enqueue(exec_ctx, cb, ok, NULL); return; } @@ -334,7 +330,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); - grpc_exec_ctx_enqueue(exec_ctx, cb, 0); + grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL); return; } } diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index 24d6204e07..a33d8f63a0 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -224,7 +224,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -290,7 +290,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success); + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index a1a6b04cad..fe006c603c 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -425,15 +425,5 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, gpr_mu_unlock(&s->mu); } -/* TODO(rjshade): Add a test for this method. */ -void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len, - const struct sockaddr *peer_address) { - ssize_t rc; - rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address)); - if (rc < 0) { - gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno)); - } -} - #endif #endif diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index de5736c426..73a21c80ab 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -72,12 +72,4 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, grpc_closure *on_done); -/* Write the contents of buffer to the underlying UDP socket. */ -/* -void grpc_udp_server_write(grpc_udp_server *s, - const char *buffer, - int buf_len, - const struct sockaddr* to); - */ - #endif /* GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H */ diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index 714536233c..36dd133468 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -47,9 +47,7 @@ #include "src/core/iomgr/workqueue_windows.h" #endif -/** A workqueue represents a list of work to be executed asynchronously. */ -struct grpc_workqueue; -typedef struct grpc_workqueue grpc_workqueue; +/* grpc_workqueue is forward declared in exec_ctx.h */ /** Create a work queue */ grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx); diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index d2a1c34612..da11df67ef 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -45,7 +45,7 @@ #include "src/core/iomgr/fd_posix.h" -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success); +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { char name[32]; @@ -110,7 +110,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_unlock(&workqueue->mu); } -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_workqueue *workqueue = arg; if (!success) { diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 8b56c57645..afba0079f5 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -793,7 +793,7 @@ static void md_only_test_destruct(grpc_call_credentials *creds) { } static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx, - void *user_data, int success) { + void *user_data, bool success) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds; @@ -812,7 +812,7 @@ static void md_only_test_get_request_metadata( grpc_credentials_metadata_request *cb_arg = grpc_credentials_metadata_request_create(creds, cb, user_data); grpc_executor_enqueue( - grpc_closure_create(on_simulated_token_fetch_done, cb_arg), 1); + grpc_closure_create(on_simulated_token_fetch_done, cb_arg), true); } else { cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK); } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 5385e41130..f3ac14568a 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -86,7 +86,7 @@ static void on_compute_engine_detection_http_response( gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int s) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) { grpc_pollset_destroy(p); } @@ -157,7 +157,7 @@ static grpc_call_credentials *create_default_creds_from_path(char *creds_path) { if (grpc_auth_json_key_is_valid(&key)) { result = grpc_service_account_jwt_access_credentials_create_from_auth_json_key( - key, grpc_max_auth_token_lifetime); + key, grpc_max_auth_token_lifetime()); goto end; } diff --git a/src/core/security/handshake.c b/src/core/security/handshake.c index 364b765396..a8b2fef629 100644 --- a/src/core/security/handshake.c +++ b/src/core/security/handshake.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -61,10 +61,10 @@ typedef struct { } grpc_security_handshake; static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, - void *setup, int success); + void *setup, bool success); static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup, - int success); + bool success); static void security_connector_remove_handshake(grpc_security_handshake *h) { grpc_security_connector_handshake_list *node; @@ -198,7 +198,8 @@ static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx, } static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, - void *handshake, int success) { + void *handshake, + bool success) { grpc_security_handshake *h = handshake; size_t consumed_slice_size = 0; tsi_result result = TSI_OK; @@ -265,7 +266,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, /* If handshake is NULL, the handshake is done. */ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, - void *handshake, int success) { + void *handshake, bool success) { grpc_security_handshake *h = handshake; /* Make sure that write is OK. */ diff --git a/src/core/security/json_token.c b/src/core/security/json_token.c index 4d4bc4baad..762f02989a 100644 --- a/src/core/security/json_token.c +++ b/src/core/security/json_token.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -49,7 +49,13 @@ /* --- Constants. --- */ /* 1 hour max. */ -const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0, GPR_TIMESPAN}; +gpr_timespec grpc_max_auth_token_lifetime() { + gpr_timespec out; + out.tv_sec = 3600; + out.tv_nsec = 0; + out.clock_type = GPR_TIMESPAN; + return out; +} #define GRPC_JWT_RSA_SHA256_ALGORITHM "RS256" #define GRPC_JWT_TYPE "JWT" @@ -211,9 +217,9 @@ static char *encoded_jwt_claim(const grpc_auth_json_key *json_key, gpr_timespec expiration = gpr_time_add(now, token_lifetime); char now_str[GPR_LTOA_MIN_BUFSIZE]; char expiration_str[GPR_LTOA_MIN_BUFSIZE]; - if (gpr_time_cmp(token_lifetime, grpc_max_auth_token_lifetime) > 0) { + if (gpr_time_cmp(token_lifetime, grpc_max_auth_token_lifetime()) > 0) { gpr_log(GPR_INFO, "Cropping token lifetime to maximum allowed value."); - expiration = gpr_time_add(now, grpc_max_auth_token_lifetime); + expiration = gpr_time_add(now, grpc_max_auth_token_lifetime()); } int64_ttoa(now.tv_sec, now_str); int64_ttoa(expiration.tv_sec, expiration_str); diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 9b87e268c6..d11c43be20 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -126,7 +126,7 @@ static void flush_read_staging_buffer(secure_endpoint *ep, uint8_t **cur, } static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, - int success) { + bool success) { if (grpc_trace_secure_endpoint) { size_t i; for (i = 0; i < ep->read_buffer->count; i++) { @@ -137,11 +137,11 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, } } ep->read_buffer = NULL; - grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success); + grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success, NULL); SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } -static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { unsigned i; uint8_t keep_looping = 0; tsi_result result = TSI_OK; @@ -315,7 +315,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - grpc_exec_ctx_enqueue(exec_ctx, cb, 0); + grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL); return; } diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index 61336a1057..bdccbabfea 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -61,6 +61,14 @@ static const char *installed_roots_path = INSTALL_PREFIX "/share/grpc/roots.pem"; #endif +/* -- Overridden default roots. -- */ + +static grpc_ssl_roots_override_callback ssl_roots_override_cb = NULL; + +void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) { + ssl_roots_override_cb = cb; +} + /* -- Cipher suites. -- */ /* Defines the cipher suites that we accept by default. All these cipher suites @@ -595,23 +603,44 @@ static grpc_security_connector_vtable ssl_channel_vtable = { static grpc_security_connector_vtable ssl_server_vtable = { ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer}; -static gpr_slice default_pem_root_certs; +static gpr_slice compute_default_pem_root_certs_once(void) { + gpr_slice result = gpr_empty_slice(); -static void init_default_pem_root_certs(void) { /* First try to load the roots from the environment. */ char *default_root_certs_path = gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR); - if (default_root_certs_path == NULL) { - default_pem_root_certs = gpr_empty_slice(); - } else { - default_pem_root_certs = gpr_load_file(default_root_certs_path, 0, NULL); + if (default_root_certs_path != NULL) { + result = gpr_load_file(default_root_certs_path, 0, NULL); gpr_free(default_root_certs_path); } + /* Try overridden roots if needed. */ + grpc_ssl_roots_override_result ovrd_res = GRPC_SSL_ROOTS_OVERRIDE_FAIL; + if (GPR_SLICE_IS_EMPTY(result) && ssl_roots_override_cb != NULL) { + char *pem_root_certs = NULL; + ovrd_res = ssl_roots_override_cb(&pem_root_certs); + if (ovrd_res == GRPC_SSL_ROOTS_OVERRIDE_OK) { + GPR_ASSERT(pem_root_certs != NULL); + result = gpr_slice_new(pem_root_certs, strlen(pem_root_certs), gpr_free); + } + } + /* Fall back to installed certs if needed. */ - if (GPR_SLICE_IS_EMPTY(default_pem_root_certs)) { - default_pem_root_certs = gpr_load_file(installed_roots_path, 0, NULL); + if (GPR_SLICE_IS_EMPTY(result) && + ovrd_res != GRPC_SSL_ROOTS_OVERRIDE_FAIL_PERMANENTLY) { + result = gpr_load_file(installed_roots_path, 0, NULL); } + return result; +} + +static gpr_slice default_pem_root_certs; + +static void init_default_pem_root_certs(void) { + default_pem_root_certs = compute_default_pem_root_certs_once(); +} + +gpr_slice grpc_get_default_ssl_roots_for_testing(void) { + return compute_default_pem_root_certs_once(); } size_t grpc_get_default_ssl_roots(const unsigned char **pem_root_certs) { diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h index 2b734109b3..39df7821f0 100644 --- a/src/core/security/security_connector.h +++ b/src/core/security/security_connector.h @@ -209,6 +209,9 @@ grpc_security_status grpc_ssl_channel_security_connector_create( /* Gets the default ssl roots. */ size_t grpc_get_default_ssl_roots(const unsigned char **pem_root_certs); +/* Exposed for TESTING ONLY!. */ +gpr_slice grpc_get_default_ssl_roots_for_testing(void); + /* Config for ssl servers. */ typedef struct { unsigned char **pem_private_keys; diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index d5c8c54369..4c78711387 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -153,7 +153,7 @@ static void on_md_processing_done( } static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, - int success) { + bool success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 08713fceaf..ee3443c828 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -142,7 +142,7 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, on_accept, state); } -static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) { +static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) { grpc_server_secure_state *state = statep; if (state->destroy_callback != NULL) { state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, diff --git a/src/core/statistics/census_init.c b/src/core/statistics/census_init.c index e6306f5e6f..b6a962f228 100644 --- a/src/core/statistics/census_init.c +++ b/src/core/statistics/census_init.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/support/env_win32.c b/src/core/support/env_win32.c index 6b1ff102b0..10258283ba 100644 --- a/src/core/support/env_win32.c +++ b/src/core/support/env_win32.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,7 +38,12 @@ #include "src/core/support/env.h" #include "src/core/support/string.h" +#ifdef __MINGW32__ +errno_t getenv_s(size_t *size_needed, char *buffer, size_t size, + const char *varname); +#else #include <stdlib.h> +#endif #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -47,14 +52,17 @@ char *gpr_getenv(const char *name) { size_t size; char *result = NULL; - char *duplicated; errno_t err; - err = _dupenv_s(&result, &size, name); - if (err) return NULL; - duplicated = gpr_strdup(result); - free(result); - return duplicated; + err = getenv_s(&size, NULL, 0, name); + if (err || (size == 0)) return NULL; + result = gpr_malloc(size); + err = getenv_s(&size, result, size, name); + if (err) { + gpr_free(result); + return NULL; + } + return result; } void gpr_setenv(const char *name, const char *value) { diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c index 40adcd1b50..e18e667fe5 100644 --- a/src/core/support/log_win32.c +++ b/src/core/support/log_win32.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -109,13 +109,13 @@ void gpr_default_log(gpr_log_func_args *args) { fflush(stderr); } -char *gpr_format_message(DWORD messageid) { +char *gpr_format_message(int messageid) { LPTSTR tmessage; char *message; DWORD status = FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR)(&tmessage), 0, NULL); if (status == 0) return gpr_strdup("Unable to retrieve error string"); message = gpr_tchar_to_char(tmessage); diff --git a/src/core/support/string_win32.c b/src/core/support/string_win32.c index 914ba8771c..3b1f702cf1 100644 --- a/src/core/support/string_win32.c +++ b/src/core/support/string_win32.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -85,8 +85,8 @@ LPTSTR gpr_char_to_tchar(LPCSTR input) { LPTSTR ret; int needed = MultiByteToWideChar(CP_UTF8, 0, input, -1, NULL, 0); - if (needed == 0) return NULL; - ret = gpr_malloc(needed * sizeof(TCHAR)); + if (needed <= 0) return NULL; + ret = gpr_malloc((unsigned)needed * sizeof(TCHAR)); MultiByteToWideChar(CP_UTF8, 0, input, -1, ret, needed); return ret; } @@ -95,8 +95,8 @@ LPSTR gpr_tchar_to_char(LPCTSTR input) { LPSTR ret; int needed = WideCharToMultiByte(CP_UTF8, 0, input, -1, NULL, 0, NULL, NULL); - if (needed == 0) return NULL; - ret = gpr_malloc(needed); + if (needed <= 0) return NULL; + ret = gpr_malloc((unsigned)needed); WideCharToMultiByte(CP_UTF8, 0, input, -1, ret, needed, NULL, NULL); return ret; } diff --git a/src/core/support/subprocess_windows.c b/src/core/support/subprocess_windows.c new file mode 100644 index 0000000000..d48c5437f0 --- /dev/null +++ b/src/core/support/subprocess_windows.c @@ -0,0 +1,141 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WINDOWS_SUBPROCESS + +#include <windows.h> +#include <string.h> +#include <tchar.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/subprocess.h> +#include "src/core/support/string.h" +#include "src/core/support/string_win32.h" + +struct gpr_subprocess { + PROCESS_INFORMATION pi; + int joined; + int interrupted; +}; + +const char *gpr_subprocess_binary_extension() { return ".exe"; } + +gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) { + gpr_subprocess *r; + + STARTUPINFO si; + PROCESS_INFORMATION pi; + + char *args = gpr_strjoin_sep(argv, argc, " ", NULL); + TCHAR *args_tchar; + + args_tchar = gpr_char_to_tchar(args); + gpr_free(args); + + memset(&si, 0, sizeof(si)); + si.cb = sizeof(si); + memset(&pi, 0, sizeof(pi)); + + if (!CreateProcess(NULL, args_tchar, NULL, NULL, FALSE, + CREATE_NEW_PROCESS_GROUP, NULL, NULL, &si, &pi)) { + gpr_free(args_tchar); + return NULL; + } + gpr_free(args_tchar); + + r = gpr_malloc(sizeof(gpr_subprocess)); + memset(r, 0, sizeof(*r)); + r->pi = pi; + return r; +} + +void gpr_subprocess_destroy(gpr_subprocess *p) { + if (p) { + if (!p->joined) { + gpr_subprocess_interrupt(p); + gpr_subprocess_join(p); + } + if (p->pi.hProcess) { + CloseHandle(p->pi.hProcess); + } + if (p->pi.hThread) { + CloseHandle(p->pi.hThread); + } + gpr_free(p); + } +} + +int gpr_subprocess_join(gpr_subprocess *p) { + DWORD dwExitCode; + if (GetExitCodeProcess(p->pi.hProcess, &dwExitCode)) { + if (dwExitCode == STILL_ACTIVE) { + if (WaitForSingleObject(p->pi.hProcess, INFINITE) == WAIT_OBJECT_0) { + p->joined = 1; + goto getExitCode; + } + return -1; // failed to join + } else { + goto getExitCode; + } + } else { + return -1; // failed to get exit code + } + +getExitCode: + if (p->interrupted) { + return 0; + } + if (GetExitCodeProcess(p->pi.hProcess, &dwExitCode)) { + return dwExitCode; + } else { + return -1; // failed to get exit code + } +} + +void gpr_subprocess_interrupt(gpr_subprocess *p) { + DWORD dwExitCode; + if (GetExitCodeProcess(p->pi.hProcess, &dwExitCode)) { + if (dwExitCode == STILL_ACTIVE) { + gpr_log(GPR_INFO, "sending ctrl-break"); + GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, p->pi.dwProcessId); + p->joined = 1; + p->interrupted = 1; + } + } + return; +} + +#endif /* GPR_WINDOWS_SUBPROCESS */ diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c index 51a082b29e..84d412a75f 100644 --- a/src/core/support/sync_win32.c +++ b/src/core/support/sync_win32.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -94,7 +94,11 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { if (now_ms >= deadline_ms) { timeout = 1; } else { - timeout_max_ms = (DWORD)min(deadline_ms - now_ms, INFINITE - 1); + if ((deadline_ms - now_ms) >= INFINITE) { + timeout_max_ms = INFINITE - 1; + } else { + timeout_max_ms = (DWORD)(deadline_ms - now_ms); + } timeout = (SleepConditionVariableCS(cv, &mu->cs, timeout_max_ms) == 0 && GetLastError() == ERROR_TIMEOUT); } diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index 2bed0f6a9c..8af957e6f4 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,9 +37,12 @@ #ifdef GPR_WIN32 +#include <grpc/support/log.h> #include <grpc/support/time.h> #include <src/core/support/time_precise.h> #include <sys/timeb.h> +#include <process.h> +#include <limits.h> #include "src/core/support/block_annotate.h" @@ -50,11 +53,12 @@ void gpr_time_init(void) { LARGE_INTEGER frequency; QueryPerformanceFrequency(&frequency); QueryPerformanceCounter(&g_start_time); - g_time_scale = 1.0 / frequency.QuadPart; + g_time_scale = 1.0 / (double)frequency.QuadPart; } gpr_timespec gpr_now(gpr_clock_type clock) { gpr_timespec now_tv; + LONGLONG diff; struct _timeb now_tb; LARGE_INTEGER timestamp; double now_dbl; @@ -68,10 +72,14 @@ gpr_timespec gpr_now(gpr_clock_type clock) { case GPR_CLOCK_MONOTONIC: case GPR_CLOCK_PRECISE: QueryPerformanceCounter(×tamp); - now_dbl = (timestamp.QuadPart - g_start_time.QuadPart) * g_time_scale; + diff = timestamp.QuadPart - g_start_time.QuadPart; + now_dbl = (double)diff * g_time_scale; now_tv.tv_sec = (int64_t)now_dbl; now_tv.tv_nsec = (int32_t)((now_dbl - (double)now_tv.tv_sec) * 1e9); break; + case GPR_TIMESPAN: + abort(); + break; } return now_tv; } @@ -79,7 +87,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) { void gpr_sleep_until(gpr_timespec until) { gpr_timespec now; gpr_timespec delta; - DWORD sleep_millis; + int64_t sleep_millis; for (;;) { /* We could simplify by using clock_nanosleep instead, but it might be @@ -91,9 +99,10 @@ void gpr_sleep_until(gpr_timespec until) { delta = gpr_time_sub(until, now); sleep_millis = - (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; + delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; + GPR_ASSERT((sleep_millis >= 0) && (sleep_millis <= INT_MAX)); GRPC_SCHEDULING_START_BLOCKING_REGION; - Sleep(sleep_millis); + Sleep((DWORD)sleep_millis); GRPC_SCHEDULING_END_BLOCKING_REGION; } } diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c index 7c47dd56f8..d753023ca9 100644 --- a/src/core/surface/alarm.c +++ b/src/core/surface/alarm.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,7 +48,7 @@ struct grpc_alarm { static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, grpc_cq_completion *c) {} -static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_alarm *alarm = arg; grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, success, do_nothing_end_completion, NULL, &alarm->completion); @@ -65,7 +65,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm, gpr_now(GPR_CLOCK_MONOTONIC)); - grpc_cq_begin_op(cq); + grpc_cq_begin_op(cq, tag); grpc_exec_ctx_finish(&exec_ctx); return alarm; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 880666bb38..9495e748b5 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -229,9 +229,9 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, - int success); + bool success); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - int success); + bool success); grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, @@ -351,7 +351,7 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } -static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) { +static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { size_t i; int ii; grpc_call *c = call; @@ -688,13 +688,13 @@ typedef struct cancel_closure { grpc_status_code status; } cancel_closure; -static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) { +static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { cancel_closure *cc = ccp; GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); gpr_free(cc); } -static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) { +static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { grpc_transport_stream_op op; cancel_closure *cc = ccp; memset(&op, 0, sizeof(op)); @@ -721,7 +721,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, cc->call = c; cc->status = status; GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL); return GRPC_CALL_OK; } @@ -757,7 +757,7 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } -static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_call *call = arg; gpr_mu_lock(&call->mu); call->have_alarm = 0; @@ -934,7 +934,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { - grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); + grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL); gpr_mu_lock(&call->mu); bctl->call->used_batches = (uint8_t)(bctl->call->used_batches & @@ -974,7 +974,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, } static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - int success) { + bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; @@ -993,7 +993,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } -static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) { +static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; grpc_call *child_call; @@ -1066,7 +1066,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) { } static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - int success) { + bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 001692376f..12d8ebceb9 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -80,7 +80,7 @@ struct grpc_channel { /* the protobuf library will (by default) start warning at 100megs */ #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) -static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success); +static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success); grpc_channel *grpc_channel_create_from_filters( grpc_exec_ctx *exec_ctx, const char *target, @@ -268,7 +268,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, } static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { grpc_channel *channel = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); while (channel->registered_calls) { diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 10f5c4da4d..2dd4fce26b 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -165,11 +165,11 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, } } -static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) { +static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) { partly_done(exec_ctx, pw, 1); } -static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) { +static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) { partly_done(exec_ctx, pw, 0); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 49083f0870..4d4337d288 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -80,11 +80,11 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { connector_unref(exec_ctx, arg); } -static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { connector *c = arg; grpc_closure *notify; grpc_endpoint *tcp = c->tcp; diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c index b4ce282787..983f1c8a66 100644 --- a/src/core/surface/channel_ping.c +++ b/src/core/surface/channel_ping.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,7 +53,7 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(arg); } -static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, bool success) { ping_result *pr = arg; grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr, &pr->completion_storage); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c0db9c508a..75298eb795 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -86,7 +86,7 @@ static gpr_mu g_freelist_mu; grpc_completion_queue *g_freelist; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, - int success); + bool success); void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); } @@ -169,7 +169,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 19cea4c4f6..e3ab70dba7 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -117,8 +117,10 @@ void grpc_init(void) { grpc_iomgr_init(); grpc_executor_init(); grpc_tracer_init("GRPC_TRACE"); - /* Only initialize census if noone else has. */ - if (census_enabled() == CENSUS_FEATURE_NONE) { + /* Only initialize census if no one else has and some features are + * available. */ + if (census_enabled() == CENSUS_FEATURE_NONE && + census_supported() != CENSUS_FEATURE_NONE) { if (census_initialize(census_supported())) { /* enable all features. */ gpr_log(GPR_ERROR, "Could not initialize census."); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index a60e9d20da..705996cad3 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -78,8 +78,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } else if (op->recv_trailing_metadata != NULL) { fill_metadata(elem, op->recv_trailing_metadata); } - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0); - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 552a570713..dd1441ad67 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -128,14 +128,14 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { connector *c = arg; grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base, c->connecting_endpoint, on_secure_handshake_done, c); } -static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { connector *c = arg; grpc_closure *notify; grpc_endpoint *tcp = c->newly_connecting_endpoint; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 79db13810a..42cffccb4c 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -260,7 +260,7 @@ struct shutdown_cleanup_args { }; static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_status_ignored) { + bool iomgr_status_ignored) { struct shutdown_cleanup_args *a = arg; gpr_slice_unref(a->slice); gpr_free(a); @@ -313,7 +313,7 @@ static void request_matcher_destroy(request_matcher *rm) { gpr_stack_lockfree_destroy(rm->requests); } -static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) { +static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -328,7 +328,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); } } @@ -392,7 +392,7 @@ static void orphan_channel(channel_data *chand) { } static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, - int success) { + bool success) { channel_data *chand = cd; grpc_server *server = chand->server; GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); @@ -407,7 +407,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { maybe_finish_shutdown(exec_ctx, chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true, + NULL); } static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -420,7 +421,7 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); return; } @@ -569,7 +570,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - int success) { + bool success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; gpr_timespec op_deadline; @@ -609,7 +610,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - int success) { + bool success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; if (success) { @@ -620,7 +621,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -653,7 +654,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, } static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, - int iomgr_status_ignored) { + bool iomgr_status_ignored) { channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { @@ -779,9 +780,7 @@ grpc_server *grpc_server_create_from_filters( const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args) { size_t i; - /* TODO(census): restore this once we finalize census filter etc. - int census_enabled = grpc_channel_args_is_census_enabled(args); */ - int census_enabled = 0; + int census_enabled = grpc_channel_args_is_census_enabled(args); grpc_server *server = gpr_malloc(sizeof(grpc_server)); @@ -987,7 +986,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, } static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s, - int success) { + bool success) { grpc_server *server = s; gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; @@ -1142,7 +1141,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, + NULL); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; @@ -1231,7 +1231,7 @@ done: } static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, - void *user_data, int success); + void *user_data, bool success); static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { gpr_slice slice = value->slice; @@ -1317,7 +1317,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, } static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc, - int success) { + bool success) { requested_call *rc = prc; grpc_call *call = *rc->call; grpc_call_element *elem = diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 6e21d2dcd7..ce970dfe73 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -82,7 +82,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1); + grpc_exec_ctx_enqueue(exec_ctx, destroy_done, true, NULL); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index f30093e06b..5e37e80948 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -43,9 +43,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { const grpc_channel_filter *filters[3]; size_t num_filters = 0; filters[num_filters++] = &grpc_compress_filter; - if (grpc_channel_args_is_census_enabled(args)) { - filters[num_filters++] = &grpc_server_census_filter; - } GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); return grpc_server_create_from_filters(filters, num_filters, args); } diff --git a/src/core/surface/version.c b/src/core/surface/version.c index aada18e07e..262a13f184 100644 --- a/src/core/surface/version.c +++ b/src/core/surface/version.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/transport/chttp2/hpack_encoder.c b/src/core/transport/chttp2/hpack_encoder.c index 89a80d896c..f30f574d06 100644 --- a/src/core/transport/chttp2/hpack_encoder.c +++ b/src/core/transport/chttp2/hpack_encoder.c @@ -283,7 +283,7 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, len_val_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)len_val, 1); GRPC_CHTTP2_WRITE_VARINT(key_index, 2, 0x40, add_tiny_header_data(st, len_pfx), len_pfx); - GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, 0x00, + GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); add_header_data(st, gpr_slice_ref(value_slice)); } @@ -300,7 +300,7 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, len_val_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)len_val, 1); GRPC_CHTTP2_WRITE_VARINT(key_index, 4, 0x00, add_tiny_header_data(st, len_pfx), len_pfx); - GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, 0x00, + GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); add_header_data(st, gpr_slice_ref(value_slice)); } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index a8262b7af2..c611496e7e 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H #include <assert.h> +#include <stdbool.h> #include "src/core/iomgr/endpoint.h" #include "src/core/transport/chttp2/frame.h" @@ -67,6 +68,9 @@ typedef enum { GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, + /* streams waiting for the outgoing window in the writing path, they will be + * merged to the stalled list or writable list under transport lock. */ + GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT, /** streams that are waiting to start because there are too many concurrent streams on the connection */ GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY, @@ -488,7 +492,7 @@ void grpc_chttp2_perform_writes( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, - void *transport_writing, int success); + void *transport_writing, bool success); void grpc_chttp2_cleanup_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); @@ -504,11 +508,11 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); -/** Get a writable stream - returns non-zero if there was a stream available */ void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +/** Get a writable stream + returns non-zero if there was a stream available */ int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, @@ -560,9 +564,12 @@ int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -void grpc_chttp2_list_add_stalled_by_transport( +void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); +void grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, bool is_window_available); + int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 273a513e2f..2f31a47cb3 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -313,12 +313,27 @@ int grpc_chttp2_list_pop_check_read_ops( return r; } -void grpc_chttp2_list_add_stalled_by_transport( +void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), - GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); +} + +void grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + bool is_window_available) { + grpc_chttp2_stream *stream; + grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); + while (stream_list_pop(transport, &stream, + GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { + if (is_window_available) { + grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); + } else { + stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + } + } } int grpc_chttp2_list_pop_stalled_by_transport( diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index fdad05b5fb..095883c66d 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -130,8 +130,8 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); } } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, + stream_writing); } } if (stream_global->send_trailing_metadata) { @@ -188,7 +188,7 @@ void grpc_chttp2_perform_writes( grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, &transport_writing->done_cb); } else { - grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, 1); + grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, true, NULL); } } @@ -273,8 +273,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, stream_writing->sent_message = 1; } } else if (transport_writing->outgoing_window == 0) { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, + stream_writing); grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } } @@ -312,8 +312,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, /* do nothing - already reffed */ } } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, + stream_writing); grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } } else { @@ -329,6 +329,10 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; + bool is_window_available = transport_writing->outgoing_window > 0; + + grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing, + is_window_available); while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 05b25fd8b0..9298573c7f 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -86,14 +86,14 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(grpc_exec_ctx *exec_ctx, void *t, - int iomgr_success_ignored); + bool iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); /** Endpoint callback to process incoming data */ -static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success); +static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success); /** Start disconnection chain */ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); @@ -183,7 +183,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 0); + grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, false, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -602,7 +602,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { t->parsing_active)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); - grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1); + grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL); prevent_endpoint_shutdown(t); } check_read_ops(exec_ctx, &t->global); @@ -631,7 +631,7 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, - void *transport_writing_ptr, int success) { + void *transport_writing_ptr, bool success) { grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); grpc_chttp2_stream_global *stream_global; @@ -669,7 +669,7 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, - int iomgr_success_ignored) { + bool iomgr_success_ignored) { grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("writing_action", 0); grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep); @@ -759,7 +759,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, closure->final_data |= 1; } if (closure->final_data < 2) { - grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0, NULL); } *pclosure = NULL; } @@ -777,7 +777,7 @@ static int contains_non_ok_status( return 0; } -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {} +static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {} static void perform_stream_op_locked( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -934,7 +934,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1); + grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, true, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -951,7 +951,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, lock(t); - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -1022,11 +1022,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); GPR_ASSERT(*stream_global->recv_message != NULL); - grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1); + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true, + NULL); stream_global->recv_message_ready = NULL; } else if (stream_global->published_trailing_metadata) { *stream_global->recv_message = NULL; - grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1); + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true, + NULL); stream_global->recv_message_ready = NULL; } } @@ -1336,7 +1338,7 @@ static void read_error_locked(grpc_exec_ctx *exec_ctx, } /* tcp read callback */ -static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { +static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { size_t i; int keep_reading = 0; grpc_chttp2_transport *t = tp; @@ -1523,7 +1525,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, unlock(exec_ctx, bs->transport); return 1; } else if (bs->failed) { - grpc_exec_ctx_enqueue(exec_ctx, on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, on_complete, false, NULL); unlock(exec_ctx, bs->transport); return 0; } else { @@ -1552,7 +1554,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bs->transport->mu); if (bs->on_next != NULL) { *bs->next = slice; - grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1); + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL); bs->on_next = NULL; } else { gpr_slice_buffer_add(&bs->slices, slice); @@ -1567,7 +1569,7 @@ void grpc_chttp2_incoming_byte_stream_finished( if (from_parsing_thread) { gpr_mu_lock(&bs->transport->mu); } - grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 0); + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); bs->on_next = NULL; bs->failed = 1; if (from_parsing_thread) { diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 3c3fd4671d..87765b9799 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -78,7 +78,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } else { success = 0; } - grpc_exec_ctx_enqueue(exec_ctx, w->notify, success); + grpc_exec_ctx_enqueue(exec_ctx, w->notify, success, NULL); gpr_free(w); } gpr_free(tracker->name); @@ -109,7 +109,7 @@ int grpc_connectivity_state_notify_on_state_change( if (current == NULL) { grpc_connectivity_state_watcher *w = tracker->watchers; if (w != NULL && w->notify == notify) { - grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL); tracker->watchers = w->next; gpr_free(w); return 0; @@ -117,7 +117,7 @@ int grpc_connectivity_state_notify_on_state_change( while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { - grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL); w->next = w->next->next; gpr_free(rm_candidate); return 0; @@ -128,7 +128,7 @@ int grpc_connectivity_state_notify_on_state_change( } else { if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + grpc_exec_ctx_enqueue(exec_ctx, notify, true, NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -158,7 +158,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; - grpc_exec_ctx_enqueue(exec_ctx, w->notify, 1); + grpc_exec_ctx_enqueue(exec_ctx, w->notify, true, NULL); gpr_free(w); } } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 2ab978be46..08d685668c 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -59,7 +59,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { #endif if (gpr_unref(&refcount->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, 1); + grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, true, NULL); } } @@ -125,8 +125,8 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0); - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, @@ -150,7 +150,7 @@ typedef struct { grpc_closure closure; } close_message_data; -static void free_message(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) { +static void free_message(grpc_exec_ctx *exec_ctx, void *p, bool iomgr_success) { close_message_data *cmd = p; gpr_slice_unref(cmd->message); if (cmd->then_call != NULL) { diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f94f0ae76e..f5cac77adc 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -81,16 +81,29 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { + /** Send initial metadata to the peer, from the provided metadata batch. */ grpc_metadata_batch *send_initial_metadata; + + /** Send trailing metadata to the peer, from the provided metadata batch. */ grpc_metadata_batch *send_trailing_metadata; + /** Send message data to the peer, from the provided byte stream. */ grpc_byte_stream *send_message; + /** Receive initial metadata from the stream, into provided metadata batch. */ grpc_metadata_batch *recv_initial_metadata; + + /** Receive message data from the stream, into provided byte stream. */ grpc_byte_stream **recv_message; + /** Should be enqueued when one message is ready to be processed. */ grpc_closure *recv_message_ready; + + /** Receive trailing metadata from the stream, into provided metadata batch. + */ grpc_metadata_batch *recv_trailing_metadata; + /** Should be enqueued when all requested operations (excluding recv_message + which has its own closure) in a given batch have been completed. */ grpc_closure *on_complete; /** If != GRPC_STATUS_OK, cancel this stream */ |