aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/context.c500
-rw-r--r--src/core/census/context.h47
-rw-r--r--src/core/census/grpc_filter.c4
-rw-r--r--src/core/census/placeholders.c9
-rw-r--r--src/core/census/tag_set.c535
-rw-r--r--src/core/channel/http_client_filter.c4
-rw-r--r--src/core/channel/http_server_filter.c4
-rw-r--r--src/core/channel/subchannel_call_holder.c6
-rw-r--r--src/core/iomgr/iocp_windows.c2
-rw-r--r--src/core/iomgr/pollset_posix.c2
-rw-r--r--src/core/iomgr/pollset_windows.c6
-rw-r--r--src/core/iomgr/tcp_server.h11
-rw-r--r--src/core/security/server_auth_filter.c4
-rw-r--r--src/core/security/server_secure_chttp2.c1
-rw-r--r--src/core/support/stack_lockfree.c12
-rw-r--r--src/core/support/sync_win32.c1
-rw-r--r--src/core/support/time.c38
-rw-r--r--src/core/surface/call.c154
-rw-r--r--src/core/surface/lame_client.c3
-rw-r--r--src/core/surface/server.c4
-rw-r--r--src/core/surface/version.c2
-rw-r--r--src/core/transport/chttp2/internal.h2
-rw-r--r--src/core/transport/chttp2/writing.c7
-rw-r--r--src/core/transport/chttp2_transport.c15
-rw-r--r--src/core/transport/static_metadata.c34
-rw-r--r--src/core/transport/static_metadata.h6
-rw-r--r--src/core/transport/transport.c1
-rw-r--r--src/core/transport/transport.h5
28 files changed, 687 insertions, 732 deletions
diff --git a/src/core/census/context.c b/src/core/census/context.c
index cab58b653c..e60330de64 100644
--- a/src/core/census/context.c
+++ b/src/core/census/context.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,16 +31,500 @@
*
*/
-#include "src/core/census/context.h"
-
-#include <string.h>
#include <grpc/census.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/useful.h>
+#include <stdbool.h>
+#include <string.h>
+#include "src/core/support/string.h"
+
+// Functions in this file support the public context API, including
+// encoding/decoding as part of context propagation across RPC's. The overall
+// requirements (in approximate priority order) for the
+// context representation:
+// 1. Efficient conversion to/from wire format
+// 2. Minimal bytes used on-wire
+// 3. Efficient context creation
+// 4. Efficient lookup of tag value for a key
+// 5. Efficient iteration over tags
+// 6. Minimal memory footprint
+//
+// Notes on tradeoffs/decisions:
+// * tag includes 1 byte length of key, as well as nil-terminating byte. These
+// are to aid in efficient parsing and the ability to directly return key
+// strings. This is more important than saving a single byte/tag on the wire.
+// * The wire encoding uses only single byte values. This eliminates the need
+// to handle endian-ness conversions. It also means there is a hard upper
+// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS.
+// * Keep all tag information (keys/values/flags) in a single memory buffer,
+// that can be directly copied to the wire.
+// * Binary tags share the same structure as, but are encoded separately from,
+// non-binary tags. This is primarily because non-binary tags are far more
+// likely to be repeated across multiple RPC calls, so are more efficiently
+// cached and compressed in any metadata schemes.
+
+// Structure representing a set of tags. Essentially a count of number of tags
+// present, and pointer to a chunk of memory that contains the per-tag details.
+struct tag_set {
+ int ntags; // number of tags.
+ int ntags_alloc; // ntags + number of deleted tags (total number of tags
+ // in all of kvm). This will always be == ntags, except during the process
+ // of building a new tag set.
+ size_t kvm_size; // number of bytes allocated for key/value storage.
+ size_t kvm_used; // number of bytes of used key/value memory
+ char *kvm; // key/value memory. Consists of repeated entries of:
+ // Offset Size Description
+ // 0 1 Key length, including trailing 0. (K)
+ // 1 1 Value length. (V)
+ // 2 1 Flags
+ // 3 K Key bytes
+ // 3 + K V Value bytes
+ //
+ // We refer to the first 3 entries as the 'tag header'. If extra values are
+ // introduced in the header, you will need to modify the TAG_HEADER_SIZE
+ // constant, the raw_tag structure (and everything that uses it) and the
+ // encode/decode functions appropriately.
+};
+
+// Number of bytes in tag header.
+#define TAG_HEADER_SIZE 3 // key length (1) + value length (1) + flags (1)
+// Offsets to tag header entries.
+#define KEY_LEN_OFFSET 0
+#define VALUE_LEN_OFFSET 1
+#define FLAG_OFFSET 2
+
+// raw_tag represents the raw-storage form of a tag in the kvm of a tag_set.
+struct raw_tag {
+ uint8_t key_len;
+ uint8_t value_len;
+ uint8_t flags;
+ char *key;
+ char *value;
+};
+
+// Use a reserved flag bit for indication of deleted tag.
+#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED
+#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED)
+
+// Primary (external) representation of a context. Composed of 3 underlying
+// tag_set structs, one for each of the binary/printable propagated tags, and
+// one for everything else. This is to efficiently support tag
+// encoding/decoding.
+struct census_context {
+ struct tag_set tags[3];
+ census_context_status status;
+};
+
+// Indices into the tags member of census_context
+#define PROPAGATED_TAGS 0
+#define PROPAGATED_BINARY_TAGS 1
+#define LOCAL_TAGS 2
+
+// Extract a raw tag given a pointer (raw) to the tag header. Allow for some
+// extra bytes in the tag header (see encode/decode functions for usage: this
+// allows for future expansion of the tag header).
+static char *decode_tag(struct raw_tag *tag, char *header, int offset) {
+ tag->key_len = (uint8_t)(*header++);
+ tag->value_len = (uint8_t)(*header++);
+ tag->flags = (uint8_t)(*header++);
+ header += offset;
+ tag->key = header;
+ header += tag->key_len;
+ tag->value = header;
+ return header + tag->value_len;
+}
-/* Placeholder implementation only. */
+// Make a copy (in 'to') of an existing tag_set.
+static void tag_set_copy(struct tag_set *to, const struct tag_set *from) {
+ memcpy(to, from, sizeof(struct tag_set));
+ to->kvm = gpr_malloc(to->kvm_size);
+ memcpy(to->kvm, from->kvm, from->kvm_used);
+}
+
+// Delete a tag from a tag_set, if it exists (returns true if it did).
+static bool tag_set_delete_tag(struct tag_set *tags, const char *key,
+ size_t key_len) {
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags_alloc; i++) {
+ uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET);
+ struct raw_tag tag;
+ kvp = decode_tag(&tag, kvp, 0);
+ if (CENSUS_TAG_IS_DELETED(tag.flags)) continue;
+ if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) {
+ *flags |= CENSUS_TAG_DELETED;
+ tags->ntags--;
+ return true;
+ }
+ }
+ return false;
+}
+
+// Delete a tag from a context, return true if it existed.
+static bool context_delete_tag(census_context *context, const census_tag *tag,
+ size_t key_len) {
+ return (
+ tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) ||
+ tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len) ||
+ tag_set_delete_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag->key,
+ key_len));
+}
+
+// Add a tag to a tag_set. Return true on success, false if the tag could
+// not be added because of constraints on tag set size. This function should
+// not be called if the tag may already exist (in a non-deleted state) in
+// the tag_set, as that would result in two tags with the same key.
+static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
+ size_t key_len) {
+ if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) {
+ return false;
+ }
+ const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE;
+ if (tags->kvm_used + tag_size > tags->kvm_size) {
+ // allocate new memory if needed
+ tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE;
+ char *new_kvm = gpr_malloc(tags->kvm_size);
+ memcpy(new_kvm, tags->kvm, tags->kvm_used);
+ gpr_free(tags->kvm);
+ tags->kvm = new_kvm;
+ }
+ char *kvp = tags->kvm + tags->kvm_used;
+ *kvp++ = (char)key_len;
+ *kvp++ = (char)tag->value_len;
+ // ensure reserved flags are not used.
+ *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS |
+ CENSUS_TAG_BINARY));
+ memcpy(kvp, tag->key, key_len);
+ kvp += key_len;
+ memcpy(kvp, tag->value, tag->value_len);
+ tags->kvm_used += tag_size;
+ tags->ntags++;
+ tags->ntags_alloc++;
+ return true;
+}
+
+// Add/modify/delete a tag to/in a context. Caller must validate that tag key
+// etc. are valid.
+static void context_modify_tag(census_context *context, const census_tag *tag,
+ size_t key_len) {
+ // First delete the tag if it is already present.
+ bool deleted = context_delete_tag(context, tag, key_len);
+ // Determine if we need to add it back.
+ bool call_add = tag->value != NULL && tag->value_len != 0;
+ bool added = false;
+ if (call_add) {
+ if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
+ if (CENSUS_TAG_IS_BINARY(tag->flags)) {
+ added = tag_set_add_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag,
+ key_len);
+ } else {
+ added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len);
+ }
+ } else {
+ added = tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len);
+ }
+ }
+ if (deleted) {
+ if (call_add) {
+ context->status.n_modified_tags++;
+ } else {
+ context->status.n_deleted_tags++;
+ }
+ } else {
+ if (added) {
+ context->status.n_added_tags++;
+ } else {
+ context->status.n_ignored_tags++;
+ }
+ }
+}
+
+// Remove memory used for deleted tags from a tag set. Basic algorithm:
+// 1) Walk through tag set to find first deleted tag. Record where it is.
+// 2) Find the next not-deleted tag. Copy all of kvm from there to the end
+// "over" the deleted tags
+// 3) repeat #1 and #2 until we have seen all tags
+// 4) if we are still looking for a not-deleted tag, then all the end portion
+// of the kvm is deleted. Just reduce the used amount of memory by the
+// appropriate amount.
+static void tag_set_flatten(struct tag_set *tags) {
+ if (tags->ntags == tags->ntags_alloc) return;
+ bool found_deleted = false; // found a deleted tag.
+ char *kvp = tags->kvm;
+ char *dbase = NULL; // record location of deleted tag
+ for (int i = 0; i < tags->ntags_alloc; i++) {
+ struct raw_tag tag;
+ char *next_kvp = decode_tag(&tag, kvp, 0);
+ if (found_deleted) {
+ if (!CENSUS_TAG_IS_DELETED(tag.flags)) {
+ ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags
+ GPR_ASSERT(reduce > 0);
+ ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp;
+ GPR_ASSERT(copy_size > 0);
+ memmove(dbase, kvp, (size_t)copy_size);
+ tags->kvm_used -= (size_t)reduce;
+ next_kvp -= reduce;
+ found_deleted = false;
+ }
+ } else {
+ if (CENSUS_TAG_IS_DELETED(tag.flags)) {
+ dbase = kvp;
+ found_deleted = true;
+ }
+ }
+ kvp = next_kvp;
+ }
+ if (found_deleted) {
+ GPR_ASSERT(dbase > tags->kvm);
+ tags->kvm_used = (size_t)(dbase - tags->kvm);
+ }
+ tags->ntags_alloc = tags->ntags;
+}
-size_t census_context_serialize(const census_context *context, char *buffer,
- size_t buf_size) {
- /* TODO(aveitch): implement serialization */
+census_context *census_context_create(const census_context *base,
+ const census_tag *tags, int ntags,
+ census_context_status const **status) {
+ census_context *context = gpr_malloc(sizeof(census_context));
+ // If we are given a base, copy it into our new tag set. Otherwise set it
+ // to zero/NULL everything.
+ if (base == NULL) {
+ memset(context, 0, sizeof(census_context));
+ } else {
+ tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]);
+ tag_set_copy(&context->tags[PROPAGATED_BINARY_TAGS],
+ &base->tags[PROPAGATED_BINARY_TAGS]);
+ tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]);
+ memset(&context->status, 0, sizeof(context->status));
+ }
+ // Walk over the additional tags and, for those that aren't invalid, modify
+ // the context to add/replace/delete as required.
+ for (int i = 0; i < ntags; i++) {
+ const census_tag *tag = &tags[i];
+ size_t key_len = strlen(tag->key) + 1;
+ // ignore the tag if it is too long/short.
+ if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN &&
+ tag->value_len <= CENSUS_MAX_TAG_KV_LEN) {
+ context_modify_tag(context, tag, key_len);
+ } else {
+ context->status.n_invalid_tags++;
+ }
+ }
+ // Remove any deleted tags, update status if needed, and return.
+ tag_set_flatten(&context->tags[PROPAGATED_TAGS]);
+ tag_set_flatten(&context->tags[PROPAGATED_BINARY_TAGS]);
+ tag_set_flatten(&context->tags[LOCAL_TAGS]);
+ context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags;
+ context->status.n_propagated_binary_tags =
+ context->tags[PROPAGATED_BINARY_TAGS].ntags;
+ context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags;
+ if (status) {
+ *status = &context->status;
+ }
+ return context;
+}
+
+const census_context_status *census_context_get_status(
+ const census_context *context) {
+ return &context->status;
+}
+
+void census_context_destroy(census_context *context) {
+ gpr_free(context->tags[PROPAGATED_TAGS].kvm);
+ gpr_free(context->tags[PROPAGATED_BINARY_TAGS].kvm);
+ gpr_free(context->tags[LOCAL_TAGS].kvm);
+ gpr_free(context);
+}
+
+void census_context_initialize_iterator(const census_context *context,
+ census_context_iterator *iterator) {
+ iterator->context = context;
+ iterator->index = 0;
+ if (context->tags[PROPAGATED_TAGS].ntags != 0) {
+ iterator->base = PROPAGATED_TAGS;
+ iterator->kvm = context->tags[PROPAGATED_TAGS].kvm;
+ } else if (context->tags[PROPAGATED_BINARY_TAGS].ntags != 0) {
+ iterator->base = PROPAGATED_BINARY_TAGS;
+ iterator->kvm = context->tags[PROPAGATED_BINARY_TAGS].kvm;
+ } else if (context->tags[LOCAL_TAGS].ntags != 0) {
+ iterator->base = LOCAL_TAGS;
+ iterator->kvm = context->tags[LOCAL_TAGS].kvm;
+ } else {
+ iterator->base = -1;
+ }
+}
+
+int census_context_next_tag(census_context_iterator *iterator,
+ census_tag *tag) {
+ if (iterator->base < 0) {
+ return 0;
+ }
+ struct raw_tag raw;
+ iterator->kvm = decode_tag(&raw, iterator->kvm, 0);
+ tag->key = raw.key;
+ tag->value = raw.value;
+ tag->value_len = raw.value_len;
+ tag->flags = raw.flags;
+ if (++iterator->index == iterator->context->tags[iterator->base].ntags) {
+ do {
+ if (iterator->base == LOCAL_TAGS) {
+ iterator->base = -1;
+ return 1;
+ }
+ } while (iterator->context->tags[++iterator->base].ntags == 0);
+ iterator->index = 0;
+ iterator->kvm = iterator->context->tags[iterator->base].kvm;
+ }
+ return 1;
+}
+
+// Find a tag in a tag_set by key. Return true if found, false otherwise.
+static bool tag_set_get_tag(const struct tag_set *tags, const char *key,
+ size_t key_len, census_tag *tag) {
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags; i++) {
+ struct raw_tag raw;
+ kvp = decode_tag(&raw, kvp, 0);
+ if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) {
+ tag->key = raw.key;
+ tag->value = raw.value;
+ tag->value_len = raw.value_len;
+ tag->flags = raw.flags;
+ return true;
+ }
+ }
+ return false;
+}
+
+int census_context_get_tag(const census_context *context, const char *key,
+ census_tag *tag) {
+ size_t key_len = strlen(key) + 1;
+ if (key_len == 1) {
+ return 0;
+ }
+ if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) ||
+ tag_set_get_tag(&context->tags[PROPAGATED_BINARY_TAGS], key, key_len,
+ tag) ||
+ tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) {
+ return 1;
+ }
return 0;
}
+
+// Context encoding and decoding functions.
+//
+// Wire format for tag_set's on the wire:
+//
+// First, a tag set header:
+//
+// offset bytes description
+// 0 1 version number
+// 1 1 number of bytes in this header. This allows for future
+// expansion.
+// 2 1 number of bytes in each tag header.
+// 3 1 ntags value from tag set.
+//
+// This is followed by the key/value memory from struct tag_set.
+
+#define ENCODED_VERSION 0 // Version number
+#define ENCODED_HEADER_SIZE 4 // size of tag set header
+
+// Encode a tag set. Returns 0 if buffer is too small.
+static size_t tag_set_encode(const struct tag_set *tags, char *buffer,
+ size_t buf_size) {
+ if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) {
+ return 0;
+ }
+ buf_size -= ENCODED_HEADER_SIZE;
+ *buffer++ = (char)ENCODED_VERSION;
+ *buffer++ = (char)ENCODED_HEADER_SIZE;
+ *buffer++ = (char)TAG_HEADER_SIZE;
+ *buffer++ = (char)tags->ntags;
+ if (tags->ntags == 0) {
+ return ENCODED_HEADER_SIZE;
+ }
+ memcpy(buffer, tags->kvm, tags->kvm_used);
+ return ENCODED_HEADER_SIZE + tags->kvm_used;
+}
+
+char *census_context_encode(const census_context *context, char *buffer,
+ size_t buf_size, size_t *print_buf_size,
+ size_t *bin_buf_size) {
+ *print_buf_size =
+ tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size);
+ if (*print_buf_size == 0) {
+ return NULL;
+ }
+ char *b_buffer = buffer + *print_buf_size;
+ *bin_buf_size = tag_set_encode(&context->tags[PROPAGATED_BINARY_TAGS],
+ b_buffer, buf_size - *print_buf_size);
+ if (*bin_buf_size == 0) {
+ return NULL;
+ }
+ return b_buffer;
+}
+
+// Decode a tag set.
+static void tag_set_decode(struct tag_set *tags, const char *buffer,
+ size_t size) {
+ uint8_t version = (uint8_t)(*buffer++);
+ uint8_t header_size = (uint8_t)(*buffer++);
+ uint8_t tag_header_size = (uint8_t)(*buffer++);
+ tags->ntags = tags->ntags_alloc = (int)(*buffer++);
+ if (tags->ntags == 0) {
+ tags->ntags_alloc = 0;
+ tags->kvm_size = 0;
+ tags->kvm_used = 0;
+ tags->kvm = NULL;
+ return;
+ }
+ if (header_size != ENCODED_HEADER_SIZE) {
+ GPR_ASSERT(version != ENCODED_VERSION);
+ GPR_ASSERT(ENCODED_HEADER_SIZE < header_size);
+ buffer += (header_size - ENCODED_HEADER_SIZE);
+ }
+ tags->kvm_used = size - header_size;
+ tags->kvm_size = tags->kvm_used + CENSUS_MAX_TAG_KV_LEN;
+ tags->kvm = gpr_malloc(tags->kvm_size);
+ if (tag_header_size != TAG_HEADER_SIZE) {
+ // something new in the tag information. I don't understand it, so
+ // don't copy it over.
+ GPR_ASSERT(version != ENCODED_VERSION);
+ GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE);
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags; i++) {
+ memcpy(kvp, buffer, TAG_HEADER_SIZE);
+ kvp += header_size;
+ struct raw_tag raw;
+ buffer =
+ decode_tag(&raw, (char *)buffer, tag_header_size - TAG_HEADER_SIZE);
+ memcpy(kvp, raw.key, (size_t)raw.key_len + raw.value_len);
+ kvp += raw.key_len + raw.value_len;
+ }
+ } else {
+ memcpy(tags->kvm, buffer, tags->kvm_used);
+ }
+}
+
+census_context *census_context_decode(const char *buffer, size_t size,
+ const char *bin_buffer, size_t bin_size) {
+ census_context *context = gpr_malloc(sizeof(census_context));
+ memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set));
+ if (buffer == NULL) {
+ memset(&context->tags[PROPAGATED_TAGS], 0, sizeof(struct tag_set));
+ } else {
+ tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size);
+ }
+ if (bin_buffer == NULL) {
+ memset(&context->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set));
+ } else {
+ tag_set_decode(&context->tags[PROPAGATED_BINARY_TAGS], bin_buffer,
+ bin_size);
+ }
+ memset(&context->status, 0, sizeof(context->status));
+ context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags;
+ context->status.n_propagated_binary_tags =
+ context->tags[PROPAGATED_BINARY_TAGS].ntags;
+ // TODO(aveitch): check that BINARY flag is correct for each type.
+ return context;
+}
diff --git a/src/core/census/context.h b/src/core/census/context.h
deleted file mode 100644
index 700bcf86cf..0000000000
--- a/src/core/census/context.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H
-#define GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H
-
-#include <grpc/census.h>
-
-#define GRPC_CENSUS_MAX_ON_THE_WIRE_TAG_BYTES 2048
-
-/* census_context is the in-memory representation of information needed to
- * maintain tracing, RPC statistics and resource usage information. */
-struct census_context {
- census_tag_set *tags; /* Opaque data structure for census tags. */
-};
-
-#endif /* GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H */
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index a8db32b9d5..c8aaf31e2d 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -107,8 +107,8 @@ static void server_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata) {
/* substitute our callback for the op callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->on_complete;
- op->on_complete = &calld->finish_recv;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->finish_recv;
}
}
diff --git a/src/core/census/placeholders.c b/src/core/census/placeholders.c
index 5829cc9460..fe23d13971 100644
--- a/src/core/census/placeholders.c
+++ b/src/core/census/placeholders.c
@@ -37,11 +37,6 @@
/* Placeholders for the pending APIs */
-census_tag_set *census_context_tag_set(census_context *context) {
- (void)context;
- abort();
-}
-
int census_get_trace_record(census_trace_record *trace_record) {
(void)trace_record;
abort();
@@ -73,7 +68,7 @@ const census_aggregation *census_view_aggregrations(const census_view *view) {
abort();
}
-census_view *census_view_create(uint32_t metric_id, const census_tag_set *tags,
+census_view *census_view_create(uint32_t metric_id, const census_context *tags,
const census_aggregation *aggregations,
size_t naggregations) {
(void)metric_id;
@@ -83,7 +78,7 @@ census_view *census_view_create(uint32_t metric_id, const census_tag_set *tags,
abort();
}
-const census_tag_set *census_view_tags(const census_view *view) {
+const census_context *census_view_tags(const census_view *view) {
(void)view;
abort();
}
diff --git a/src/core/census/tag_set.c b/src/core/census/tag_set.c
deleted file mode 100644
index 9b65a1dfa1..0000000000
--- a/src/core/census/tag_set.c
+++ /dev/null
@@ -1,535 +0,0 @@
-/*
- *
- * Copyright 2015-2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/census.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/port_platform.h>
-#include <grpc/support/useful.h>
-#include <stdbool.h>
-#include <string.h>
-#include "src/core/support/string.h"
-
-// Functions in this file support the public tag_set API, as well as
-// encoding/decoding tag_sets as part of context propagation across
-// RPC's. The overall requirements (in approximate priority order) for the
-// tag_set representations:
-// 1. Efficient conversion to/from wire format
-// 2. Minimal bytes used on-wire
-// 3. Efficient tag set creation
-// 4. Efficient lookup of value for a key
-// 5. Efficient lookup of value for an index (to support iteration)
-// 6. Minimal memory footprint
-//
-// Notes on tradeoffs/decisions:
-// * tag includes 1 byte length of key, as well as nil-terminating byte. These
-// are to aid in efficient parsing and the ability to directly return key
-// strings. This is more important than saving a single byte/tag on the wire.
-// * The wire encoding uses only single byte values. This eliminates the need
-// to handle endian-ness conversions. It also means there is a hard upper
-// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS.
-// * Keep all tag information (keys/values/flags) in a single memory buffer,
-// that can be directly copied to the wire.14
-// * Binary tags share the same structure as, but are encoded separately from,
-// non-binary tags. This is primarily because non-binary tags are far more
-// likely to be repeated across multiple RPC calls, so are more efficiently
-// cached and compressed in any metadata schemes.
-// * all lengths etc. are restricted to one byte. This eliminates endian
-// issues.
-
-// Structure representing a set of tags. Essentially a count of number of tags
-// present, and pointer to a chunk of memory that contains the per-tag details.
-struct tag_set {
- int ntags; // number of tags.
- int ntags_alloc; // ntags + number of deleted tags (total number of tags
- // in all of kvm). This will always be == ntags, except during the process
- // of building a new tag set.
- size_t kvm_size; // number of bytes allocated for key/value storage.
- size_t kvm_used; // number of bytes of used key/value memory
- char *kvm; // key/value memory. Consists of repeated entries of:
- // Offset Size Description
- // 0 1 Key length, including trailing 0. (K)
- // 1 1 Value length. (V)
- // 2 1 Flags
- // 3 K Key bytes
- // 3 + K V Value bytes
- //
- // We refer to the first 3 entries as the 'tag header'. If extra values are
- // introduced in the header, you will need to modify the TAG_HEADER_SIZE
- // constant, the raw_tag structure (and everything that uses it) and the
- // encode/decode functions appropriately.
-};
-
-// Number of bytes in tag header.
-#define TAG_HEADER_SIZE 3 // key length (1) + value length (1) + flags (1)
-// Offsets to tag header entries.
-#define KEY_LEN_OFFSET 0
-#define VALUE_LEN_OFFSET 1
-#define FLAG_OFFSET 2
-
-// raw_tag represents the raw-storage form of a tag in the kvm of a tag_set.
-struct raw_tag {
- uint8_t key_len;
- uint8_t value_len;
- uint8_t flags;
- char *key;
- char *value;
-};
-
-// Use a reserved flag bit for indication of deleted tag.
-#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED
-#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED)
-
-// Primary (external) representation of a tag set. Composed of 3 underlying
-// tag_set structs, one for each of the binary/printable propagated tags, and
-// one for everything else. This is to efficiently support tag
-// encoding/decoding.
-struct census_tag_set {
- struct tag_set tags[3];
- census_tag_set_create_status status;
-};
-
-// Indices into the tags member of census_tag_set
-#define PROPAGATED_TAGS 0
-#define PROPAGATED_BINARY_TAGS 1
-#define LOCAL_TAGS 2
-
-// Extract a raw tag given a pointer (raw) to the tag header. Allow for some
-// extra bytes in the tag header (see encode/decode functions for usage: this
-// allows for future expansion of the tag header).
-static char *decode_tag(struct raw_tag *tag, char *header, int offset) {
- tag->key_len = (uint8_t)(*header++);
- tag->value_len = (uint8_t)(*header++);
- tag->flags = (uint8_t)(*header++);
- header += offset;
- tag->key = header;
- header += tag->key_len;
- tag->value = header;
- return header + tag->value_len;
-}
-
-// Make a copy (in 'to') of an existing tag_set.
-static void tag_set_copy(struct tag_set *to, const struct tag_set *from) {
- memcpy(to, from, sizeof(struct tag_set));
- to->kvm = gpr_malloc(to->kvm_size);
- memcpy(to->kvm, from->kvm, from->kvm_used);
-}
-
-// Delete a tag from a tag_set, if it exists (returns true if it did).
-static bool tag_set_delete_tag(struct tag_set *tags, const char *key,
- size_t key_len) {
- char *kvp = tags->kvm;
- for (int i = 0; i < tags->ntags_alloc; i++) {
- uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET);
- struct raw_tag tag;
- kvp = decode_tag(&tag, kvp, 0);
- if (CENSUS_TAG_IS_DELETED(tag.flags)) continue;
- if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) {
- *flags |= CENSUS_TAG_DELETED;
- tags->ntags--;
- return true;
- }
- }
- return false;
-}
-
-// Delete a tag from a census_tag_set, return true if it existed.
-static bool cts_delete_tag(census_tag_set *tags, const census_tag *tag,
- size_t key_len) {
- return (tag_set_delete_tag(&tags->tags[LOCAL_TAGS], tag->key, key_len) ||
- tag_set_delete_tag(&tags->tags[PROPAGATED_TAGS], tag->key, key_len) ||
- tag_set_delete_tag(&tags->tags[PROPAGATED_BINARY_TAGS], tag->key,
- key_len));
-}
-
-// Add a tag to a tag_set. Return true on sucess, false if the tag could
-// not be added because of constraints on tag set size. This function should
-// not be called if the tag may already exist (in a non-deleted state) in
-// the tag_set, as that would result in two tags with the same key.
-static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
- size_t key_len) {
- if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) {
- return false;
- }
- const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE;
- if (tags->kvm_used + tag_size > tags->kvm_size) {
- // allocate new memory if needed
- tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE;
- char *new_kvm = gpr_malloc(tags->kvm_size);
- memcpy(new_kvm, tags->kvm, tags->kvm_used);
- gpr_free(tags->kvm);
- tags->kvm = new_kvm;
- }
- char *kvp = tags->kvm + tags->kvm_used;
- *kvp++ = (char)key_len;
- *kvp++ = (char)tag->value_len;
- // ensure reserved flags are not used.
- *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS |
- CENSUS_TAG_BINARY));
- memcpy(kvp, tag->key, key_len);
- kvp += key_len;
- memcpy(kvp, tag->value, tag->value_len);
- tags->kvm_used += tag_size;
- tags->ntags++;
- tags->ntags_alloc++;
- return true;
-}
-
-// Add/modify/delete a tag to/in a census_tag_set. Caller must validate that
-// tag key etc. are valid.
-static void cts_modify_tag(census_tag_set *tags, const census_tag *tag,
- size_t key_len) {
- // First delete the tag if it is already present.
- bool deleted = cts_delete_tag(tags, tag, key_len);
- // Determine if we need to add it back.
- bool call_add = tag->value != NULL && tag->value_len != 0;
- bool added = false;
- if (call_add) {
- if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
- if (CENSUS_TAG_IS_BINARY(tag->flags)) {
- added =
- tag_set_add_tag(&tags->tags[PROPAGATED_BINARY_TAGS], tag, key_len);
- } else {
- added = tag_set_add_tag(&tags->tags[PROPAGATED_TAGS], tag, key_len);
- }
- } else {
- added = tag_set_add_tag(&tags->tags[LOCAL_TAGS], tag, key_len);
- }
- }
- if (deleted) {
- if (call_add) {
- tags->status.n_modified_tags++;
- } else {
- tags->status.n_deleted_tags++;
- }
- } else {
- if (added) {
- tags->status.n_added_tags++;
- } else {
- tags->status.n_ignored_tags++;
- }
- }
-}
-
-// Remove memory used for deleted tags from the tag set. Basic algorithm:
-// 1) Walk through tag set to find first deleted tag. Record where it is.
-// 2) Find the next not-deleted tag. Copy all of kvm from there to the end
-// "over" the deleted tags
-// 3) repeat #1 and #2 until we have seen all tags
-// 4) if we are still looking for a not-deleted tag, then all the end portion
-// of the kvm is deleted. Just reduce the used amount of memory by the
-// appropriate amount.
-static void tag_set_flatten(struct tag_set *tags) {
- if (tags->ntags == tags->ntags_alloc) return;
- bool found_deleted = false; // found a deleted tag.
- char *kvp = tags->kvm;
- char *dbase = NULL; // record location of deleted tag
- for (int i = 0; i < tags->ntags_alloc; i++) {
- struct raw_tag tag;
- char *next_kvp = decode_tag(&tag, kvp, 0);
- if (found_deleted) {
- if (!CENSUS_TAG_IS_DELETED(tag.flags)) {
- ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags
- GPR_ASSERT(reduce > 0);
- ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp;
- GPR_ASSERT(copy_size > 0);
- memmove(dbase, kvp, (size_t)copy_size);
- tags->kvm_used -= (size_t)reduce;
- next_kvp -= reduce;
- found_deleted = false;
- }
- } else {
- if (CENSUS_TAG_IS_DELETED(tag.flags)) {
- dbase = kvp;
- found_deleted = true;
- }
- }
- kvp = next_kvp;
- }
- if (found_deleted) {
- GPR_ASSERT(dbase > tags->kvm);
- tags->kvm_used = (size_t)(dbase - tags->kvm);
- }
- tags->ntags_alloc = tags->ntags;
-}
-
-census_tag_set *census_tag_set_create(
- const census_tag_set *base, const census_tag *tags, int ntags,
- census_tag_set_create_status const **status) {
- census_tag_set *new_ts = gpr_malloc(sizeof(census_tag_set));
- // If we are given a base, copy it into our new tag set. Otherwise set it
- // to zero/NULL everything.
- if (base == NULL) {
- memset(new_ts, 0, sizeof(census_tag_set));
- } else {
- tag_set_copy(&new_ts->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]);
- tag_set_copy(&new_ts->tags[PROPAGATED_BINARY_TAGS],
- &base->tags[PROPAGATED_BINARY_TAGS]);
- tag_set_copy(&new_ts->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]);
- memset(&new_ts->status, 0, sizeof(new_ts->status));
- }
- // Walk over the additional tags and, for those that aren't invalid, modify
- // the tag set to add/replace/delete as required.
- for (int i = 0; i < ntags; i++) {
- const census_tag *tag = &tags[i];
- size_t key_len = strlen(tag->key) + 1;
- // ignore the tag if it is too long/short.
- if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN &&
- tag->value_len <= CENSUS_MAX_TAG_KV_LEN) {
- cts_modify_tag(new_ts, tag, key_len);
- } else {
- new_ts->status.n_invalid_tags++;
- }
- }
- // Remove any deleted tags, update status if needed, and return.
- tag_set_flatten(&new_ts->tags[PROPAGATED_TAGS]);
- tag_set_flatten(&new_ts->tags[PROPAGATED_BINARY_TAGS]);
- tag_set_flatten(&new_ts->tags[LOCAL_TAGS]);
- new_ts->status.n_propagated_tags = new_ts->tags[PROPAGATED_TAGS].ntags;
- new_ts->status.n_propagated_binary_tags =
- new_ts->tags[PROPAGATED_BINARY_TAGS].ntags;
- new_ts->status.n_local_tags = new_ts->tags[LOCAL_TAGS].ntags;
- if (status) {
- *status = &new_ts->status;
- }
- return new_ts;
-}
-
-const census_tag_set_create_status *census_tag_set_get_create_status(
- const census_tag_set *tags) {
- return &tags->status;
-}
-
-void census_tag_set_destroy(census_tag_set *tags) {
- gpr_free(tags->tags[PROPAGATED_TAGS].kvm);
- gpr_free(tags->tags[PROPAGATED_BINARY_TAGS].kvm);
- gpr_free(tags->tags[LOCAL_TAGS].kvm);
- gpr_free(tags);
-}
-
-// Initialize a tag set iterator. Must be called before first use of the
-// iterator.
-void census_tag_set_initialize_iterator(const census_tag_set *tags,
- census_tag_set_iterator *iterator) {
- iterator->tags = tags;
- iterator->index = 0;
- if (tags->tags[PROPAGATED_TAGS].ntags != 0) {
- iterator->base = PROPAGATED_TAGS;
- iterator->kvm = tags->tags[PROPAGATED_TAGS].kvm;
- } else if (tags->tags[PROPAGATED_BINARY_TAGS].ntags != 0) {
- iterator->base = PROPAGATED_BINARY_TAGS;
- iterator->kvm = tags->tags[PROPAGATED_BINARY_TAGS].kvm;
- } else if (tags->tags[LOCAL_TAGS].ntags != 0) {
- iterator->base = LOCAL_TAGS;
- iterator->kvm = tags->tags[LOCAL_TAGS].kvm;
- } else {
- iterator->base = -1;
- }
-}
-
-// Get the contents of the "next" tag in the tag set. If there are no more
-// tags in the tag set, returns 0 (and 'tag' contents will be unchanged),
-// otherwise returns 1. */
-int census_tag_set_next_tag(census_tag_set_iterator *iterator,
- census_tag *tag) {
- if (iterator->base < 0) {
- return 0;
- }
- struct raw_tag raw;
- iterator->kvm = decode_tag(&raw, iterator->kvm, 0);
- tag->key = raw.key;
- tag->value = raw.value;
- tag->value_len = raw.value_len;
- tag->flags = raw.flags;
- if (++iterator->index == iterator->tags->tags[iterator->base].ntags) {
- do {
- if (iterator->base == LOCAL_TAGS) {
- iterator->base = -1;
- return 1;
- }
- } while (iterator->tags->tags[++iterator->base].ntags == 0);
- iterator->index = 0;
- iterator->kvm = iterator->tags->tags[iterator->base].kvm;
- }
- return 1;
-}
-
-// Find a tag in a tag_set by key. Return true if found, false otherwise.
-static bool tag_set_get_tag_by_key(const struct tag_set *tags, const char *key,
- size_t key_len, census_tag *tag) {
- char *kvp = tags->kvm;
- for (int i = 0; i < tags->ntags; i++) {
- struct raw_tag raw;
- kvp = decode_tag(&raw, kvp, 0);
- if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) {
- tag->key = raw.key;
- tag->value = raw.value;
- tag->value_len = raw.value_len;
- tag->flags = raw.flags;
- return true;
- }
- }
- return false;
-}
-
-int census_tag_set_get_tag_by_key(const census_tag_set *tags, const char *key,
- census_tag *tag) {
- size_t key_len = strlen(key) + 1;
- if (key_len == 1) {
- return 0;
- }
- if (tag_set_get_tag_by_key(&tags->tags[PROPAGATED_TAGS], key, key_len, tag) ||
- tag_set_get_tag_by_key(&tags->tags[PROPAGATED_BINARY_TAGS], key, key_len,
- tag) ||
- tag_set_get_tag_by_key(&tags->tags[LOCAL_TAGS], key, key_len, tag)) {
- return 1;
- }
- return 0;
-}
-
-// tag_set encoding and decoding functions.
-//
-// Wire format for tag sets on the wire:
-//
-// First, a tag set header:
-//
-// offset bytes description
-// 0 1 version number
-// 1 1 number of bytes in this header. This allows for future
-// expansion.
-// 2 1 number of bytes in each tag header.
-// 3 1 ntags value from tag set.
-//
-// This is followed by the key/value memory from struct tag_set.
-
-#define ENCODED_VERSION 0 // Version number
-#define ENCODED_HEADER_SIZE 4 // size of tag set header
-
-// Encode a tag set. Returns 0 if buffer is too small.
-static size_t tag_set_encode(const struct tag_set *tags, char *buffer,
- size_t buf_size) {
- if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) {
- return 0;
- }
- buf_size -= ENCODED_HEADER_SIZE;
- *buffer++ = (char)ENCODED_VERSION;
- *buffer++ = (char)ENCODED_HEADER_SIZE;
- *buffer++ = (char)TAG_HEADER_SIZE;
- *buffer++ = (char)tags->ntags;
- if (tags->ntags == 0) {
- return ENCODED_HEADER_SIZE;
- }
- memcpy(buffer, tags->kvm, tags->kvm_used);
- return ENCODED_HEADER_SIZE + tags->kvm_used;
-}
-
-char *census_tag_set_encode(const census_tag_set *tags, char *buffer,
- size_t buf_size, size_t *print_buf_size,
- size_t *bin_buf_size) {
- *print_buf_size =
- tag_set_encode(&tags->tags[PROPAGATED_TAGS], buffer, buf_size);
- if (*print_buf_size == 0) {
- return NULL;
- }
- char *b_buffer = buffer + *print_buf_size;
- *bin_buf_size = tag_set_encode(&tags->tags[PROPAGATED_BINARY_TAGS], b_buffer,
- buf_size - *print_buf_size);
- if (*bin_buf_size == 0) {
- return NULL;
- }
- return b_buffer;
-}
-
-// Decode a tag set.
-static void tag_set_decode(struct tag_set *tags, const char *buffer,
- size_t size) {
- uint8_t version = (uint8_t)(*buffer++);
- uint8_t header_size = (uint8_t)(*buffer++);
- uint8_t tag_header_size = (uint8_t)(*buffer++);
- tags->ntags = tags->ntags_alloc = (int)(*buffer++);
- if (tags->ntags == 0) {
- tags->ntags_alloc = 0;
- tags->kvm_size = 0;
- tags->kvm_used = 0;
- tags->kvm = NULL;
- return;
- }
- if (header_size != ENCODED_HEADER_SIZE) {
- GPR_ASSERT(version != ENCODED_VERSION);
- GPR_ASSERT(ENCODED_HEADER_SIZE < header_size);
- buffer += (header_size - ENCODED_HEADER_SIZE);
- }
- tags->kvm_used = size - header_size;
- tags->kvm_size = tags->kvm_used + CENSUS_MAX_TAG_KV_LEN;
- tags->kvm = gpr_malloc(tags->kvm_size);
- if (tag_header_size != TAG_HEADER_SIZE) {
- // something new in the tag information. I don't understand it, so
- // don't copy it over.
- GPR_ASSERT(version != ENCODED_VERSION);
- GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE);
- char *kvp = tags->kvm;
- for (int i = 0; i < tags->ntags; i++) {
- memcpy(kvp, buffer, TAG_HEADER_SIZE);
- kvp += header_size;
- struct raw_tag raw;
- buffer =
- decode_tag(&raw, (char *)buffer, tag_header_size - TAG_HEADER_SIZE);
- memcpy(kvp, raw.key, (size_t)raw.key_len + raw.value_len);
- kvp += raw.key_len + raw.value_len;
- }
- } else {
- memcpy(tags->kvm, buffer, tags->kvm_used);
- }
-}
-
-census_tag_set *census_tag_set_decode(const char *buffer, size_t size,
- const char *bin_buffer, size_t bin_size) {
- census_tag_set *new_ts = gpr_malloc(sizeof(census_tag_set));
- memset(&new_ts->tags[LOCAL_TAGS], 0, sizeof(struct tag_set));
- if (buffer == NULL) {
- memset(&new_ts->tags[PROPAGATED_TAGS], 0, sizeof(struct tag_set));
- } else {
- tag_set_decode(&new_ts->tags[PROPAGATED_TAGS], buffer, size);
- }
- if (bin_buffer == NULL) {
- memset(&new_ts->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set));
- } else {
- tag_set_decode(&new_ts->tags[PROPAGATED_BINARY_TAGS], bin_buffer, bin_size);
- }
- memset(&new_ts->status, 0, sizeof(new_ts->status));
- new_ts->status.n_propagated_tags = new_ts->tags[PROPAGATED_TAGS].ntags;
- new_ts->status.n_propagated_binary_tags =
- new_ts->tags[PROPAGATED_BINARY_TAGS].ntags;
- // TODO(aveitch): check that BINARY flag is correct for each type.
- return new_ts;
-}
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 43eee046b8..1aa27208c2 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -127,8 +127,8 @@ static void hc_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->on_complete;
- op->on_complete = &calld->hc_on_recv;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hc_on_recv;
}
}
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index bb75323933..370f8dbe42 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -186,8 +186,8 @@ static void hs_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->on_complete;
- op->on_complete = &calld->hs_on_recv;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hs_on_recv;
}
}
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index 3ad9fd9efb..81297c8d44 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -241,10 +241,8 @@ static void fail_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
- grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false,
- NULL);
- grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
- false, NULL);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx,
+ &holder->waiting_ops[i]);
}
holder->waiting_ops_count = 0;
}
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 96b6f81024..759340e00e 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -57,7 +57,7 @@ static HANDLE g_iocp;
static DWORD deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
- static const int max_spin_polling_us = 10;
+ static const int64_t max_spin_polling_us = 10;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return INFINITE;
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index a8e2e22977..19ee6650f0 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -393,7 +393,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
- static const int max_spin_polling_us = 10;
+ static const int64_t max_spin_polling_us = 10;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return -1;
}
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 2e650ca939..02c6678363 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -212,10 +212,8 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
grpc_iocp_kick();
}
} else {
- if (p->is_iocp_worker) {
- if (g_active_poller == specific_worker) {
- grpc_iocp_kick();
- }
+ if (p->is_iocp_worker && g_active_poller == specific_worker) {
+ grpc_iocp_kick();
} else {
specific_worker->kicked = 1;
gpr_cv_signal(&specific_worker->cv);
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 8f3184ff1e..a39dd3bafc 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -40,15 +40,14 @@
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;
-typedef struct grpc_tcp_server_acceptor grpc_tcp_server_acceptor;
-struct grpc_tcp_server_acceptor {
+typedef struct grpc_tcp_server_acceptor {
/* grpc_tcp_server_cb functions share a ref on from_server that is valid
until the function returns. */
grpc_tcp_server *from_server;
/* Indices that may be passed to grpc_tcp_server_port_fd(). */
unsigned port_index;
unsigned fd_index;
-};
+} grpc_tcp_server_acceptor;
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
@@ -57,7 +56,7 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
/* Create a server, initially not bound to any ports. The caller owns one ref.
If shutdown_complete is not NULL, it will be used by
- grpc_tcp_server_unref(). */
+ grpc_tcp_server_unref() when the ref count reaches zero. */
grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete);
/* Start listening to bound ports */
@@ -84,7 +83,7 @@ unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, unsigned port_index);
/* Returns the file descriptor of the Mth (fd_index) listening socket of the Nth
(port_index) call to add_port() on this server, or -1 if the indices are out
of bounds. The file descriptor remains owned by the server, and will be
- cleaned up when grpc_tcp_server_destroy is called. */
+ cleaned up when the ref count reaches zero. */
int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index);
@@ -97,7 +96,7 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s);
void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
grpc_closure *shutdown_starting);
-/* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue
+/* If the refcount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue
a call (exec_ctx!=NULL) to shutdown_complete. */
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s);
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 4c78711387..3d8e5e8d35 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -176,8 +176,8 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->on_complete;
- op->on_complete = &calld->auth_on_recv;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->auth_on_recv;
calld->transport_op = *op;
}
}
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 92daab9bf1..84a883390c 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -241,7 +241,6 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
}
grpc_resolved_addresses_destroy(resolved);
-
/* Register with the server only upon success */
grpc_server_add_listener(&exec_ctx, server, state, start, destroy);
diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c
index cd0afddf9d..9daecd2e18 100644
--- a/src/core/support/stack_lockfree.c
+++ b/src/core/support/stack_lockfree.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -99,6 +99,11 @@ gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) {
/* Point the head at reserved dummy entry */
stack->head.contents.index = INVALID_ENTRY_INDEX;
+/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */
+#ifdef GPR_ARCH_64
+ stack->head.contents.pad = 0;
+#endif
+ stack->head.contents.aba_ctr = 0;
return stack;
}
@@ -115,6 +120,11 @@ int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) {
/* First fill in the entry's index and aba ctr for new head */
newhead.contents.index = (uint16_t)entry;
+#ifdef GPR_ARCH_64
+ /* Fill in the pad to avoid confusing memcheck tools */
+ newhead.contents.pad = 0;
+#endif
+
/* Also post-increment the aba_ctr */
curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
newhead.contents.aba_ctr = ++curent.contents.aba_ctr;
diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c
index 84d412a75f..41998ebcb6 100644
--- a/src/core/support/sync_win32.c
+++ b/src/core/support/sync_win32.c
@@ -87,6 +87,7 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
0) {
SleepConditionVariableCS(cv, &mu->cs, INFINITE);
} else {
+ abs_deadline = gpr_convert_clock_type(abs_deadline, GPR_CLOCK_REALTIME);
gpr_timespec now = gpr_now(abs_deadline.clock_type);
int64_t now_ms = (int64_t)now.tv_sec * 1000 + now.tv_nsec / 1000000;
int64_t deadline_ms =
diff --git a/src/core/support/time.c b/src/core/support/time.c
index ac8c3bcde5..423d12ffc0 100644
--- a/src/core/support/time.c
+++ b/src/core/support/time.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -83,12 +83,12 @@ gpr_timespec gpr_inf_past(gpr_clock_type type) {
/* TODO(ctiller): consider merging _nanos, _micros, _millis into a single
function for maintainability. Similarly for _seconds, _minutes, and _hours */
-gpr_timespec gpr_time_from_nanos(long ns, gpr_clock_type type) {
+gpr_timespec gpr_time_from_nanos(int64_t ns, gpr_clock_type type) {
gpr_timespec result;
result.clock_type = type;
- if (ns == LONG_MAX) {
+ if (ns == INT64_MAX) {
result = gpr_inf_future(type);
- } else if (ns == LONG_MIN) {
+ } else if (ns == INT64_MIN) {
result = gpr_inf_past(type);
} else if (ns >= 0) {
result.tv_sec = ns / GPR_NS_PER_SEC;
@@ -101,12 +101,12 @@ gpr_timespec gpr_time_from_nanos(long ns, gpr_clock_type type) {
return result;
}
-gpr_timespec gpr_time_from_micros(long us, gpr_clock_type type) {
+gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type type) {
gpr_timespec result;
result.clock_type = type;
- if (us == LONG_MAX) {
+ if (us == INT64_MAX) {
result = gpr_inf_future(type);
- } else if (us == LONG_MIN) {
+ } else if (us == INT64_MIN) {
result = gpr_inf_past(type);
} else if (us >= 0) {
result.tv_sec = us / 1000000;
@@ -119,12 +119,12 @@ gpr_timespec gpr_time_from_micros(long us, gpr_clock_type type) {
return result;
}
-gpr_timespec gpr_time_from_millis(long ms, gpr_clock_type type) {
+gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) {
gpr_timespec result;
result.clock_type = type;
- if (ms == LONG_MAX) {
+ if (ms == INT64_MAX) {
result = gpr_inf_future(type);
- } else if (ms == LONG_MIN) {
+ } else if (ms == INT64_MIN) {
result = gpr_inf_past(type);
} else if (ms >= 0) {
result.tv_sec = ms / 1000;
@@ -137,12 +137,12 @@ gpr_timespec gpr_time_from_millis(long ms, gpr_clock_type type) {
return result;
}
-gpr_timespec gpr_time_from_seconds(long s, gpr_clock_type type) {
+gpr_timespec gpr_time_from_seconds(int64_t s, gpr_clock_type type) {
gpr_timespec result;
result.clock_type = type;
- if (s == LONG_MAX) {
+ if (s == INT64_MAX) {
result = gpr_inf_future(type);
- } else if (s == LONG_MIN) {
+ } else if (s == INT64_MIN) {
result = gpr_inf_past(type);
} else {
result.tv_sec = s;
@@ -151,12 +151,12 @@ gpr_timespec gpr_time_from_seconds(long s, gpr_clock_type type) {
return result;
}
-gpr_timespec gpr_time_from_minutes(long m, gpr_clock_type type) {
+gpr_timespec gpr_time_from_minutes(int64_t m, gpr_clock_type type) {
gpr_timespec result;
result.clock_type = type;
- if (m >= LONG_MAX / 60) {
+ if (m >= INT64_MAX / 60) {
result = gpr_inf_future(type);
- } else if (m <= LONG_MIN / 60) {
+ } else if (m <= INT64_MIN / 60) {
result = gpr_inf_past(type);
} else {
result.tv_sec = m * 60;
@@ -165,12 +165,12 @@ gpr_timespec gpr_time_from_minutes(long m, gpr_clock_type type) {
return result;
}
-gpr_timespec gpr_time_from_hours(long h, gpr_clock_type type) {
+gpr_timespec gpr_time_from_hours(int64_t h, gpr_clock_type type) {
gpr_timespec result;
result.clock_type = type;
- if (h >= LONG_MAX / 3600) {
+ if (h >= INT64_MAX / 3600) {
result = gpr_inf_future(type);
- } else if (h <= LONG_MIN / 3600) {
+ } else if (h <= INT64_MIN / 3600) {
result = gpr_inf_past(type);
} else {
result.tv_sec = h * 3600;
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 9495e748b5..1b117aa6b8 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -159,6 +159,9 @@ struct grpc_call {
uint8_t receiving_message;
uint8_t received_final_op;
+ /* have we received initial metadata */
+ bool has_initial_md_been_received;
+
batch_control active_batches[MAX_CONCURRENT_BATCHES];
/* first idx: is_receiving, second idx: is_trailing */
@@ -200,6 +203,7 @@ struct grpc_call {
gpr_slice receiving_slice;
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
+ grpc_closure receiving_initial_metadata_ready;
uint32_t test_only_last_message_flags;
union {
@@ -212,6 +216,11 @@ struct grpc_call {
int *cancelled;
} server;
} final_op;
+
+ struct {
+ void *bctlp;
+ bool success;
+ } saved_receiving_stream_ready_ctx;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -993,6 +1002,94 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
+static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
+ bool success) {
+ grpc_call *call = bctl->call;
+ if (call->receiving_stream == NULL) {
+ *call->receiving_buffer = NULL;
+ call->receiving_message = 0;
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+ } else if (call->receiving_stream->length >
+ grpc_channel_get_max_message_length(call->channel)) {
+ cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
+ "Max message size exceeded");
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ call->receiving_stream = NULL;
+ *call->receiving_buffer = NULL;
+ call->receiving_message = 0;
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+ } else {
+ call->test_only_last_message_flags = call->receiving_stream->flags;
+ if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
+ NULL, 0, call->compression_algorithm);
+ } else {
+ *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
+ }
+ grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
+ bctl);
+ continue_receiving_slices(exec_ctx, bctl);
+ /* early out */
+ return;
+ }
+}
+
+static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
+ bool success) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+
+ gpr_mu_lock(&bctl->call->mu);
+ if (bctl->call->has_initial_md_been_received) {
+ gpr_mu_unlock(&bctl->call->mu);
+ process_data_after_md(exec_ctx, bctlp, success);
+ } else {
+ call->saved_receiving_stream_ready_ctx.bctlp = bctlp;
+ call->saved_receiving_stream_ready_ctx.success = success;
+ gpr_mu_unlock(&bctl->call->mu);
+ }
+}
+
+static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
+ void *bctlp, bool success) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+
+ gpr_mu_lock(&call->mu);
+
+ grpc_metadata_batch *md =
+ &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+ grpc_metadata_batch_filter(md, recv_initial_filter, call);
+ call->has_initial_md_been_received = true;
+
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
+ 0 &&
+ !call->is_client) {
+ GPR_TIMER_BEGIN("set_deadline_alarm", 0);
+ set_deadline_alarm(exec_ctx, call, md->deadline);
+ GPR_TIMER_END("set_deadline_alarm", 0);
+ }
+
+ if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
+ grpc_closure *saved_rsr_closure = grpc_closure_create(
+ receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
+ grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure,
+ call->saved_receiving_stream_ready_ctx.success, NULL);
+ call->saved_receiving_stream_ready_ctx.bctlp = NULL;
+ }
+
+ gpr_mu_unlock(&call->mu);
+
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+}
+
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
@@ -1011,19 +1108,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
- if (bctl->recv_initial_metadata) {
- grpc_metadata_batch *md =
- &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- grpc_metadata_batch_filter(md, recv_initial_filter, call);
-
- if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
- 0 &&
- !call->is_client) {
- GPR_TIMER_BEGIN("set_deadline_alarm", 0);
- set_deadline_alarm(exec_ctx, call, md->deadline);
- GPR_TIMER_END("set_deadline_alarm", 0);
- }
- }
if (bctl->recv_final_op) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
@@ -1065,45 +1149,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
}
}
-static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success) {
- batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
-
- if (call->receiving_stream == NULL) {
- *call->receiving_buffer = NULL;
- call->receiving_message = 0;
- if (gpr_unref(&bctl->steps_to_complete)) {
- post_batch_completion(exec_ctx, bctl);
- }
- } else if (call->receiving_stream->length >
- grpc_channel_get_max_message_length(call->channel)) {
- cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
- "Max message size exceeded");
- grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
- call->receiving_stream = NULL;
- *call->receiving_buffer = NULL;
- call->receiving_message = 0;
- if (gpr_unref(&bctl->steps_to_complete)) {
- post_batch_completion(exec_ctx, bctl);
- }
- } else {
- call->test_only_last_message_flags = call->receiving_stream->flags;
- if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
- (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
- *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
- NULL, 0, call->compression_algorithm);
- } else {
- *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
- }
- grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
- bctl);
- continue_receiving_slices(exec_ctx, bctl);
- /* early out */
- return;
- }
-}
-
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
@@ -1273,9 +1318,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
+ grpc_closure_init(&call->receiving_initial_metadata_ready,
+ receiving_initial_metadata_ready, bctl);
bctl->recv_initial_metadata = 1;
stream_op.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+ stream_op.recv_initial_metadata_ready =
+ &call->receiving_initial_metadata_ready;
+ num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
/* Flag validation: currently allow no flags */
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 705996cad3..537069e984 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -78,8 +78,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
} else if (op->recv_trailing_metadata != NULL) {
fill_metadata(elem, op->recv_trailing_metadata);
}
- grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
- grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 42cffccb4c..fb5e0d4b9e 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -596,8 +596,8 @@ static void server_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv_initial_metadata = op->on_complete;
- op->on_complete = &calld->server_on_recv_initial_metadata;
+ calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
}
}
diff --git a/src/core/surface/version.c b/src/core/surface/version.c
index 262a13f184..7723f39401 100644
--- a/src/core/surface/version.c
+++ b/src/core/surface/version.c
@@ -36,4 +36,4 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "0.13.0.0"; }
+const char *grpc_version_string(void) { return "0.14.0-dev"; }
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index c611496e7e..0e1e2c4265 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -385,7 +385,7 @@ typedef struct {
grpc_closure *send_trailing_metadata_finished;
grpc_metadata_batch *recv_initial_metadata;
- grpc_closure *recv_initial_metadata_finished;
+ grpc_closure *recv_initial_metadata_ready;
grpc_byte_stream **recv_message;
grpc_closure *recv_message_ready;
grpc_metadata_batch *recv_trailing_metadata;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 095883c66d..cafecf1046 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -75,6 +75,9 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
+ bool is_window_available = transport_writing->outgoing_window > 0;
+ grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing,
+ is_window_available);
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
@@ -329,10 +332,6 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
- bool is_window_available = transport_writing->outgoing_window > 0;
-
- grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing,
- is_window_available);
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 9298573c7f..617d98875c 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -544,7 +544,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.send_message_finished == NULL);
GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
- GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
+ GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL);
GPR_ASSERT(s->global.recv_message_ready == NULL);
GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
@@ -863,9 +863,9 @@ static void perform_stream_op_locked(
}
if (op->recv_initial_metadata != NULL) {
- GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL);
- stream_global->recv_initial_metadata_finished =
- add_closure_barrier(on_complete);
+ GPR_ASSERT(stream_global->recv_initial_metadata_ready == NULL);
+ stream_global->recv_initial_metadata_ready =
+ op->recv_initial_metadata_ready;
stream_global->recv_initial_metadata = op->recv_initial_metadata;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
@@ -1009,13 +1009,14 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs;
while (
grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
- if (stream_global->recv_initial_metadata_finished != NULL &&
+ if (stream_global->recv_initial_metadata_ready != NULL &&
stream_global->published_initial_metadata) {
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_initial_metadata,
stream_global->recv_initial_metadata);
- grpc_chttp2_complete_closure_step(
- exec_ctx, &stream_global->recv_initial_metadata_finished, 1);
+ grpc_exec_ctx_enqueue(
+ exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL);
+ stream_global->recv_initial_metadata_ready = NULL;
}
if (stream_global->recv_message_ready != NULL) {
if (stream_global->incoming_frames.head != NULL) {
diff --git a/src/core/transport/static_metadata.c b/src/core/transport/static_metadata.c
index 233e5c0d92..eeedae0619 100644
--- a/src/core/transport/static_metadata.c
+++ b/src/core/transport/static_metadata.c
@@ -35,11 +35,11 @@
* WARNING: Auto-generated code.
*
* To make changes to this file, change
- *tools/codegen/core/gen_static_metadata.py,
+ * tools/codegen/core/gen_static_metadata.py,
* and then re-run it.
*
* See metadata.h for an explanation of the interface here, and metadata.c for
- *an
+ * an
* explanation of what's going on.
*/
@@ -69,21 +69,21 @@ const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = {
"0", "1", "2", "200", "204", "206", "304", "400", "404", "500", "accept",
"accept-charset", "accept-encoding", "accept-language", "accept-ranges",
"access-control-allow-origin", "age", "allow", "application/grpc",
- ":authority", "authorization", "cache-control", "census", "census-bin",
- "content-disposition", "content-encoding", "content-language",
- "content-length", "content-location", "content-range", "content-type",
- "cookie", "date", "deflate", "deflate,gzip", "", "etag", "expect",
- "expires", "from", "GET", "grpc", "grpc-accept-encoding", "grpc-encoding",
- "grpc-internal-encoding-request", "grpc-message", "grpc-status",
- "grpc-timeout", "gzip", "gzip, deflate", "host", "http", "https",
- "identity", "identity,deflate", "identity,deflate,gzip", "identity,gzip",
- "if-match", "if-modified-since", "if-none-match", "if-range",
- "if-unmodified-since", "last-modified", "link", "location", "max-forwards",
- ":method", ":path", "POST", "proxy-authenticate", "proxy-authorization",
- "range", "referer", "refresh", "retry-after", ":scheme", "server",
- "set-cookie", "/", "/index.html", ":status", "strict-transport-security",
- "te", "trailers", "transfer-encoding", "user-agent", "vary", "via",
- "www-authenticate"};
+ ":authority", "authorization", "cache-control", "census-bin",
+ "census-binary-bin", "content-disposition", "content-encoding",
+ "content-language", "content-length", "content-location", "content-range",
+ "content-type", "cookie", "date", "deflate", "deflate,gzip", "", "etag",
+ "expect", "expires", "from", "GET", "grpc", "grpc-accept-encoding",
+ "grpc-encoding", "grpc-internal-encoding-request", "grpc-message",
+ "grpc-status", "grpc-timeout", "gzip", "gzip, deflate", "host", "http",
+ "https", "identity", "identity,deflate", "identity,deflate,gzip",
+ "identity,gzip", "if-match", "if-modified-since", "if-none-match",
+ "if-range", "if-unmodified-since", "last-modified", "link", "location",
+ "max-forwards", ":method", ":path", "POST", "proxy-authenticate",
+ "proxy-authorization", "range", "referer", "refresh", "retry-after",
+ ":scheme", "server", "set-cookie", "/", "/index.html", ":status",
+ "strict-transport-security", "te", "trailers", "transfer-encoding",
+ "user-agent", "vary", "via", "www-authenticate"};
const uint8_t grpc_static_accept_encoding_metadata[8] = {0, 29, 26, 30,
28, 32, 27, 31};
diff --git a/src/core/transport/static_metadata.h b/src/core/transport/static_metadata.h
index 3803a0488b..ef72b802b5 100644
--- a/src/core/transport/static_metadata.h
+++ b/src/core/transport/static_metadata.h
@@ -94,10 +94,10 @@ extern grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT];
#define GRPC_MDSTR_AUTHORIZATION (&grpc_static_mdstr_table[20])
/* "cache-control" */
#define GRPC_MDSTR_CACHE_CONTROL (&grpc_static_mdstr_table[21])
-/* "census" */
-#define GRPC_MDSTR_CENSUS (&grpc_static_mdstr_table[22])
/* "census-bin" */
-#define GRPC_MDSTR_CENSUS_BIN (&grpc_static_mdstr_table[23])
+#define GRPC_MDSTR_CENSUS_BIN (&grpc_static_mdstr_table[22])
+/* "census-binary-bin" */
+#define GRPC_MDSTR_CENSUS_BINARY_BIN (&grpc_static_mdstr_table[23])
/* "content-disposition" */
#define GRPC_MDSTR_CONTENT_DISPOSITION (&grpc_static_mdstr_table[24])
/* "content-encoding" */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 08d685668c..6e154b629a 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -126,6 +126,7 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) {
grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_initial_metadata_ready, false, NULL);
grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f5cac77adc..8902c5d2f6 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -92,6 +92,8 @@ typedef struct grpc_transport_stream_op {
/** Receive initial metadata from the stream, into provided metadata batch. */
grpc_metadata_batch *recv_initial_metadata;
+ /** Should be enqueued when initial metadata is ready to be processed. */
+ grpc_closure *recv_initial_metadata_ready;
/** Receive message data from the stream, into provided byte stream. */
grpc_byte_stream **recv_message;
@@ -103,7 +105,8 @@ typedef struct grpc_transport_stream_op {
grpc_metadata_batch *recv_trailing_metadata;
/** Should be enqueued when all requested operations (excluding recv_message
- which has its own closure) in a given batch have been completed. */
+ and recv_initial_metadata which have their own closures) in a given batch
+ have been completed. */
grpc_closure *on_complete;
/** If != GRPC_STATUS_OK, cancel this stream */