aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-13 13:34:58 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-03-13 13:34:58 -0700
commit77af9d832de23246bcc07bc814e3d989413db079 (patch)
tree0dd6f1f6516294e461c08d7a5121690d8ec158ed /src/core/lib
parent9d3203a5ea195801eb7358beced8c2f472a7d9b2 (diff)
parentc5591f5143b6fb34f2f4ce6ae5d4ea18d9d65bf3 (diff)
Merge github.com:grpc/grpc into trim-the-locks
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/http/parser.c4
-rw-r--r--src/core/lib/iomgr/error.c422
-rw-r--r--src/core/lib/iomgr/error.h11
-rw-r--r--src/core/lib/iomgr/error_internal.h24
-rw-r--r--src/core/lib/iomgr/pollset_uv.c22
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c52
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.c12
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c1
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c19
-rw-r--r--src/core/lib/iomgr/udp_server.c6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.h2
-rw-r--r--src/core/lib/support/sync.c12
-rw-r--r--src/core/lib/surface/call.c4
-rw-r--r--src/core/lib/transport/error_utils.c22
-rw-r--r--src/core/lib/transport/transport.c35
-rw-r--r--src/core/lib/transport/transport.h6
-rw-r--r--src/core/lib/tsi/test_creds/BUILD2
17 files changed, 472 insertions, 184 deletions
diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c
index 2f84adc187..b9c56c103c 100644
--- a/src/core/lib/http/parser.c
+++ b/src/core/lib/http/parser.c
@@ -284,9 +284,9 @@ static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte,
case GRPC_HTTP_HEADERS:
if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) {
if (grpc_http1_trace)
- gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded",
+ gpr_log(GPR_ERROR, "HTTP header max line length (%d) exceeded",
GRPC_HTTP_PARSER_MAX_HEADER_LENGTH);
- return GRPC_ERROR_NONE;
+ return GRPC_ERROR_CREATE("HTTP header max line length exceeded");
}
parser->cur_line[parser->cur_line_length] = byte;
parser->cur_line_length++;
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index dbe5b139f9..7cdbe30198 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -35,6 +35,7 @@
#include <string.h>
+#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -47,46 +48,7 @@
#include "src/core/lib/iomgr/error_internal.h"
#include "src/core/lib/profiling/timers.h"
-
-static void destroy_integer(void *key) {}
-
-static void *copy_integer(void *key) { return key; }
-
-static long compare_integers(void *key1, void *key2) {
- return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2);
-}
-
-static void destroy_string(void *str) { gpr_free(str); }
-
-static void *copy_string(void *str) { return gpr_strdup(str); }
-
-static void destroy_err(void *err) { GRPC_ERROR_UNREF(err); }
-
-static void *copy_err(void *err) { return GRPC_ERROR_REF(err); }
-
-static void destroy_time(void *tm) { gpr_free(tm); }
-
-static gpr_timespec *box_time(gpr_timespec tm) {
- gpr_timespec *out = gpr_malloc(sizeof(*out));
- *out = tm;
- return out;
-}
-
-static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); }
-
-static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer,
- compare_integers,
- destroy_integer, copy_integer};
-
-static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer,
- compare_integers, destroy_string,
- copy_string};
-
-static const gpr_avl_vtable avl_vtable_times = {
- destroy_integer, copy_integer, compare_integers, destroy_time, copy_time};
-
-static const gpr_avl_vtable avl_vtable_errs = {
- destroy_integer, copy_integer, compare_integers, destroy_err, copy_err};
+#include "src/core/lib/slice/slice_internal.h"
static const char *error_int_name(grpc_error_ints key) {
switch (key) {
@@ -120,6 +82,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "limit";
case GRPC_ERROR_INT_OCCURRED_DURING_WRITE:
return "occurred_during_write";
+ case GRPC_ERROR_INT_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -150,6 +114,8 @@ static const char *error_str_name(grpc_error_strs key) {
return "filename";
case GRPC_ERROR_STR_QUEUED_BUFFERS:
return "queued_buffers";
+ case GRPC_ERROR_STR_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -158,6 +124,8 @@ static const char *error_time_name(grpc_error_times key) {
switch (key) {
case GRPC_ERROR_TIME_CREATED:
return "created";
+ case GRPC_ERROR_TIME_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -184,12 +152,36 @@ grpc_error *grpc_error_ref(grpc_error *err) {
}
#endif
+static void unref_errs(grpc_error *err) {
+ uint8_t slot = err->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ GRPC_ERROR_UNREF(lerr->err);
+ GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX
+ : lerr->next != UINT8_MAX);
+ slot = lerr->next;
+ }
+}
+
+static void unref_slice(grpc_slice slice) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_unref_internal(&exec_ctx, slice);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void unref_strs(grpc_error *err) {
+ for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) {
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ unref_slice(*(grpc_slice *)(err->arena + slot));
+ }
+ }
+}
+
static void error_destroy(grpc_error *err) {
GPR_ASSERT(!grpc_error_is_special(err));
- gpr_avl_unref(err->ints);
- gpr_avl_unref(err->strs);
- gpr_avl_unref(err->errs);
- gpr_avl_unref(err->times);
+ unref_errs(err);
+ unref_strs(err);
gpr_free((void *)gpr_atm_acq_load(&err->error_string));
gpr_free(err);
}
@@ -213,67 +205,189 @@ void grpc_error_unref(grpc_error *err) {
}
#endif
+static uint8_t get_placement(grpc_error **err, size_t size) {
+ GPR_ASSERT(*err);
+ uint8_t slots = (uint8_t)(size / sizeof(intptr_t));
+ if ((*err)->arena_size + slots > (*err)->arena_capacity) {
+ (*err)->arena_capacity = (uint8_t)(3 * (*err)->arena_capacity / 2);
+ *err = gpr_realloc(
+ *err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t));
+ }
+ uint8_t placement = (*err)->arena_size;
+ (*err)->arena_size = (uint8_t)((*err)->arena_size + slots);
+ return placement;
+}
+
+static void internal_set_int(grpc_error **err, grpc_error_ints which,
+ intptr_t value) {
+ // GPR_ASSERT((*err)->ints[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->ints[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ }
+ (*err)->ints[which] = slot;
+ (*err)->arena[slot] = value;
+}
+
+static void internal_set_str(grpc_error **err, grpc_error_strs which,
+ grpc_slice value) {
+ // GPR_ASSERT((*err)->strs[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->strs[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ } else {
+ unref_slice(*(grpc_slice *)((*err)->arena + slot));
+ }
+ (*err)->strs[which] = slot;
+ memcpy((*err)->arena + slot, &value, sizeof(value));
+}
+
+static void internal_set_time(grpc_error **err, grpc_error_times which,
+ gpr_timespec value) {
+ // GPR_ASSERT((*err)->times[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->times[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ }
+ (*err)->times[which] = slot;
+ memcpy((*err)->arena + slot, &value, sizeof(value));
+}
+
+static void internal_add_error(grpc_error **err, grpc_error *new) {
+ grpc_linked_error new_last = {new, UINT8_MAX};
+ uint8_t slot = get_placement(err, sizeof(grpc_linked_error));
+ if ((*err)->first_err == UINT8_MAX) {
+ GPR_ASSERT((*err)->last_err == UINT8_MAX);
+ (*err)->last_err = slot;
+ (*err)->first_err = slot;
+ } else {
+ GPR_ASSERT((*err)->last_err != UINT8_MAX);
+ grpc_linked_error *old_last =
+ (grpc_linked_error *)((*err)->arena + (*err)->last_err);
+ old_last->next = slot;
+ (*err)->last_err = slot;
+ }
+ memcpy((*err)->arena + slot, &new_last, sizeof(grpc_linked_error));
+}
+
+#define SLOTS_PER_INT (sizeof(intptr_t) / sizeof(intptr_t))
+#define SLOTS_PER_STR (sizeof(grpc_slice) / sizeof(intptr_t))
+#define SLOTS_PER_TIME (sizeof(gpr_timespec) / sizeof(intptr_t))
+#define SLOTS_PER_LINKED_ERROR (sizeof(grpc_linked_error) / sizeof(intptr_t))
+
+// size of storing one int and two slices and a timespec. For line, desc, file,
+// and time created
+#define DEFAULT_ERROR_CAPACITY \
+ (SLOTS_PER_INT + (SLOTS_PER_STR * 2) + SLOTS_PER_TIME)
+
+// It is very common to include and extra int and string in an error
+#define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME)
+
grpc_error *grpc_error_create(const char *file, int line, const char *desc,
grpc_error **referencing,
size_t num_referencing) {
GPR_TIMER_BEGIN("grpc_error_create", 0);
- grpc_error *err = gpr_malloc(sizeof(*err));
+ uint8_t initial_arena_capacity = (uint8_t)(
+ DEFAULT_ERROR_CAPACITY +
+ (uint8_t)(num_referencing * SLOTS_PER_LINKED_ERROR) + SURPLUS_CAPACITY);
+ grpc_error *err =
+ gpr_malloc(sizeof(*err) + initial_arena_capacity * sizeof(intptr_t));
if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL
return GRPC_ERROR_OOM;
}
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line);
#endif
- err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints),
- (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE,
- (void *)(uintptr_t)line);
- err->strs = gpr_avl_add(
- gpr_avl_add(gpr_avl_create(&avl_vtable_strs),
- (void *)(uintptr_t)GRPC_ERROR_STR_FILE, gpr_strdup(file)),
- (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, gpr_strdup(desc));
- err->errs = gpr_avl_create(&avl_vtable_errs);
- err->next_err = 0;
- for (size_t i = 0; i < num_referencing; i++) {
+
+ err->arena_size = 0;
+ err->arena_capacity = initial_arena_capacity;
+ err->first_err = UINT8_MAX;
+ err->last_err = UINT8_MAX;
+
+ memset(err->ints, UINT8_MAX, GRPC_ERROR_INT_MAX);
+ memset(err->strs, UINT8_MAX, GRPC_ERROR_STR_MAX);
+ memset(err->times, UINT8_MAX, GRPC_ERROR_TIME_MAX);
+
+ internal_set_int(&err, GRPC_ERROR_INT_FILE_LINE, line);
+ internal_set_str(&err, GRPC_ERROR_STR_FILE,
+ grpc_slice_from_static_string(file));
+ internal_set_str(
+ &err, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_copied_buffer(
+ desc,
+ strlen(desc) +
+ 1)); // TODO, pull this up. // TODO(ncteisen), pull this up.
+
+ for (size_t i = 0; i < num_referencing; ++i) {
if (referencing[i] == GRPC_ERROR_NONE) continue;
- err->errs = gpr_avl_add(err->errs, (void *)(err->next_err++),
- GRPC_ERROR_REF(referencing[i]));
+ internal_add_error(
+ &err,
+ GRPC_ERROR_REF(
+ referencing[i])); // TODO(ncteisen), change ownership semantics
}
- err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times),
- (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED,
- box_time(gpr_now(GPR_CLOCK_REALTIME)));
+
+ internal_set_time(&err, GRPC_ERROR_TIME_CREATED, gpr_now(GPR_CLOCK_REALTIME));
+
gpr_atm_no_barrier_store(&err->error_string, 0);
gpr_ref_init(&err->refs, 1);
GPR_TIMER_END("grpc_error_create", 0);
return err;
}
+static void ref_strs(grpc_error *err) {
+ for (size_t i = 0; i < GRPC_ERROR_STR_MAX; ++i) {
+ uint8_t slot = err->strs[i];
+ if (slot != UINT8_MAX) {
+ grpc_slice_ref_internal(*(grpc_slice *)(err->arena + slot));
+ }
+ }
+}
+
+static void ref_errs(grpc_error *err) {
+ uint8_t slot = err->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ GRPC_ERROR_REF(lerr->err);
+ slot = lerr->next;
+ }
+}
+
static grpc_error *copy_error_and_unref(grpc_error *in) {
GPR_TIMER_BEGIN("copy_error_and_unref", 0);
grpc_error *out;
if (grpc_error_is_special(in)) {
- if (in == GRPC_ERROR_NONE)
- out = grpc_error_set_int(GRPC_ERROR_CREATE("no error"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK);
- else if (in == GRPC_ERROR_OOM)
- out = GRPC_ERROR_CREATE("oom");
- else if (in == GRPC_ERROR_CANCELLED)
- out =
- grpc_error_set_int(GRPC_ERROR_CREATE("cancelled"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
- else
- out = GRPC_ERROR_CREATE("unknown");
+ out = GRPC_ERROR_CREATE("unknown");
+ if (in == GRPC_ERROR_NONE) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("no error"));
+ internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK);
+ } else if (in == GRPC_ERROR_OOM) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("oom"));
+ } else if (in == GRPC_ERROR_CANCELLED) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("cancelled"));
+ internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
+ }
+ } else if (gpr_ref_is_unique(&in->refs)) {
+ out = in;
} else {
- out = gpr_malloc(sizeof(*out));
+ uint8_t new_arena_capacity = in->arena_capacity;
+ // the returned err will be added to, so we ensure this is room to avoid
+ // unneeded allocations.
+ if (in->arena_capacity - in->arena_size < (uint8_t)SLOTS_PER_STR) {
+ new_arena_capacity = (uint8_t)(3 * new_arena_capacity / 2);
+ }
+ out = gpr_malloc(sizeof(*in) + new_arena_capacity * sizeof(intptr_t));
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
#endif
- out->ints = gpr_avl_ref(in->ints);
- out->strs = gpr_avl_ref(in->strs);
- out->errs = gpr_avl_ref(in->errs);
- out->times = gpr_avl_ref(in->times);
+ memcpy(out, in, sizeof(*in) + in->arena_size * sizeof(intptr_t));
+ out->arena_capacity = new_arena_capacity;
gpr_atm_no_barrier_store(&out->error_string, 0);
- out->next_err = in->next_err;
gpr_ref_init(&out->refs, 1);
+ ref_strs(out);
+ ref_errs(out);
GRPC_ERROR_UNREF(in);
}
GPR_TIMER_END("copy_error_and_unref", 0);
@@ -284,7 +398,7 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value) {
GPR_TIMER_BEGIN("grpc_error_set_int", 0);
grpc_error *new = copy_error_and_unref(src);
- new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value);
+ internal_set_int(&new, which, value);
GPR_TIMER_END("grpc_error_set_int", 0);
return new;
}
@@ -302,7 +416,6 @@ static special_error_status_map error_status_map[] = {
bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
GPR_TIMER_BEGIN("grpc_error_get_int", 0);
- void *pp;
if (grpc_error_is_special(err)) {
if (which == GRPC_ERROR_INT_GRPC_STATUS) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); i++) {
@@ -316,8 +429,9 @@ bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
GPR_TIMER_END("grpc_error_get_int", 0);
return false;
}
- if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) {
- if (p != NULL) *p = (intptr_t)pp;
+ uint8_t slot = err->ints[which];
+ if (slot != UINT8_MAX) {
+ if (p != NULL) *p = err->arena[slot];
GPR_TIMER_END("grpc_error_get_int", 0);
return true;
}
@@ -329,8 +443,9 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) {
GPR_TIMER_BEGIN("grpc_error_set_str", 0);
grpc_error *new = copy_error_and_unref(src);
- new->strs =
- gpr_avl_add(new->strs, (void *)(uintptr_t)which, gpr_strdup(value));
+ internal_set_str(&new, which,
+ grpc_slice_from_copied_buffer(
+ value, strlen(value) + 1)); // TODO, pull this up.
GPR_TIMER_END("grpc_error_set_str", 0);
return new;
}
@@ -346,13 +461,19 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) {
}
return NULL;
}
- return gpr_avl_get(err->strs, (void *)(uintptr_t)which);
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ return (const char *)GRPC_SLICE_START_PTR(
+ *(grpc_slice *)(err->arena + slot));
+ } else {
+ return NULL;
+ }
}
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
GPR_TIMER_BEGIN("grpc_error_add_child", 0);
grpc_error *new = copy_error_and_unref(src);
- new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child);
+ internal_add_error(&new, child);
GPR_TIMER_END("grpc_error_add_child", 0);
return new;
}
@@ -372,42 +493,6 @@ typedef struct {
size_t cap_kvs;
} kv_pairs;
-static void append_kv(kv_pairs *kvs, char *key, char *value) {
- if (kvs->num_kvs == kvs->cap_kvs) {
- kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4);
- kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs);
- }
- kvs->kvs[kvs->num_kvs].key = key;
- kvs->kvs[kvs->num_kvs].value = value;
- kvs->num_kvs++;
-}
-
-static void collect_kvs(gpr_avl_node *node, char *key(void *k),
- char *fmt(void *v), kv_pairs *kvs) {
- if (node == NULL) return;
- append_kv(kvs, key(node->key), fmt(node->value));
- collect_kvs(node->left, key, fmt, kvs);
- collect_kvs(node->right, key, fmt, kvs);
-}
-
-static char *key_int(void *p) {
- return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p));
-}
-
-static char *key_str(void *p) {
- return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p));
-}
-
-static char *key_time(void *p) {
- return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p));
-}
-
-static char *fmt_int(void *p) {
- char *s;
- gpr_asprintf(&s, "%" PRIdPTR, (intptr_t)p);
- return s;
-}
-
static void append_chr(char c, char **s, size_t *sz, size_t *cap) {
if (*sz == *cap) {
*cap = GPR_MAX(8, 3 * *cap / 2);
@@ -459,6 +544,40 @@ static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) {
append_chr('"', s, sz, cap);
}
+static void append_kv(kv_pairs *kvs, char *key, char *value) {
+ if (kvs->num_kvs == kvs->cap_kvs) {
+ kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4);
+ kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs);
+ }
+ kvs->kvs[kvs->num_kvs].key = key;
+ kvs->kvs[kvs->num_kvs].value = value;
+ kvs->num_kvs++;
+}
+
+static char *key_int(grpc_error_ints which) {
+ return gpr_strdup(error_int_name(which));
+}
+
+static char *fmt_int(intptr_t p) {
+ char *s;
+ gpr_asprintf(&s, "%" PRIdPTR, p);
+ return s;
+}
+
+static void collect_ints_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_INT_MAX; ++which) {
+ uint8_t slot = err->ints[which];
+ if (slot != UINT8_MAX) {
+ append_kv(kvs, key_int((grpc_error_ints)which),
+ fmt_int(err->arena[slot]));
+ }
+ }
+}
+
+static char *key_str(grpc_error_strs which) {
+ return gpr_strdup(error_str_name(which));
+}
+
static char *fmt_str(void *p) {
char *s = NULL;
size_t sz = 0;
@@ -468,8 +587,22 @@ static char *fmt_str(void *p) {
return s;
}
-static char *fmt_time(void *p) {
- gpr_timespec tm = *(gpr_timespec *)p;
+static void collect_strs_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) {
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ append_kv(
+ kvs, key_str((grpc_error_strs)which),
+ fmt_str(GRPC_SLICE_START_PTR(*(grpc_slice *)(err->arena + slot))));
+ }
+ }
+}
+
+static char *key_time(grpc_error_times which) {
+ return gpr_strdup(error_time_name(which));
+}
+
+static char *fmt_time(gpr_timespec tm) {
char *out;
char *pfx = "!!";
switch (tm.clock_type) {
@@ -490,24 +623,37 @@ static char *fmt_time(void *p) {
return out;
}
-static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap,
- bool *first) {
- if (n == NULL) return;
- add_errs(n->left, s, sz, cap, first);
- if (!*first) append_chr(',', s, sz, cap);
- *first = false;
- const char *e = grpc_error_string(n->value);
- append_str(e, s, sz, cap);
- add_errs(n->right, s, sz, cap, first);
+static void collect_times_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_TIME_MAX; ++which) {
+ uint8_t slot = err->times[which];
+ if (slot != UINT8_MAX) {
+ append_kv(kvs, key_time((grpc_error_times)which),
+ fmt_time(*(gpr_timespec *)(err->arena + slot)));
+ }
+ }
+}
+
+static void add_errs(grpc_error *err, char **s, size_t *sz, size_t *cap) {
+ uint8_t slot = err->first_err;
+ bool first = true;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ if (!first) append_chr(',', s, sz, cap);
+ first = false;
+ const char *e = grpc_error_string(lerr->err);
+ append_str(e, s, sz, cap);
+ GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX
+ : lerr->next != UINT8_MAX);
+ slot = lerr->next;
+ }
}
static char *errs_string(grpc_error *err) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
- bool first = true;
append_chr('[', &s, &sz, &cap);
- add_errs(err->errs.root, &s, &sz, &cap, &first);
+ add_errs(err, &s, &sz, &cap);
append_chr(']', &s, &sz, &cap);
append_chr(0, &s, &sz, &cap);
return s;
@@ -555,10 +701,10 @@ const char *grpc_error_string(grpc_error *err) {
kv_pairs kvs;
memset(&kvs, 0, sizeof(kvs));
- collect_kvs(err->ints.root, key_int, fmt_int, &kvs);
- collect_kvs(err->strs.root, key_str, fmt_str, &kvs);
- collect_kvs(err->times.root, key_time, fmt_time, &kvs);
- if (!gpr_avl_is_empty(err->errs)) {
+ collect_ints_kvs(err, &kvs);
+ collect_strs_kvs(err, &kvs);
+ collect_times_kvs(err, &kvs);
+ if (err->first_err != UINT8_MAX) {
append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err));
}
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 2613512acb..eb953947ae 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -102,6 +102,9 @@ typedef enum {
GRPC_ERROR_INT_LIMIT,
/// chttp2: did the error occur while a write was in progress
GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
+
+ /// Must always be last
+ GRPC_ERROR_INT_MAX,
} grpc_error_ints;
typedef enum {
@@ -129,11 +132,17 @@ typedef enum {
GRPC_ERROR_STR_KEY,
/// value associated with the error
GRPC_ERROR_STR_VALUE,
+
+ /// Must always be last
+ GRPC_ERROR_STR_MAX,
} grpc_error_strs;
typedef enum {
/// timestamp of error creation
GRPC_ERROR_TIME_CREATED,
+
+ /// Must always be last
+ GRPC_ERROR_TIME_MAX,
} grpc_error_times;
/// The following "special" errors can be propagated without allocating memory.
@@ -184,8 +193,6 @@ void grpc_error_unref(grpc_error *err);
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value) GRPC_MUST_USE_RESULT;
bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p);
-grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
- gpr_timespec value) GRPC_MUST_USE_RESULT;
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) GRPC_MUST_USE_RESULT;
/// Returns NULL if the specified string is not set.
diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h
index 1c89ead4ed..fb4814e41f 100644
--- a/src/core/lib/iomgr/error_internal.h
+++ b/src/core/lib/iomgr/error_internal.h
@@ -35,18 +35,28 @@
#define GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H
#include <inttypes.h>
-#include <stdbool.h>
+#include <stdbool.h> // TODO, do we need this?
-#include <grpc/support/avl.h>
+#include <grpc/support/sync.h>
+
+typedef struct grpc_linked_error grpc_linked_error;
+
+struct grpc_linked_error {
+ grpc_error *err;
+ uint8_t next;
+};
struct grpc_error {
gpr_refcount refs;
- gpr_avl ints;
- gpr_avl strs;
- gpr_avl times;
- gpr_avl errs;
- uintptr_t next_err;
+ uint8_t ints[GRPC_ERROR_INT_MAX];
+ uint8_t strs[GRPC_ERROR_STR_MAX];
+ uint8_t times[GRPC_ERROR_TIME_MAX];
+ uint8_t first_err;
+ uint8_t last_err;
gpr_atm error_string;
+ uint8_t arena_size;
+ uint8_t arena_capacity;
+ intptr_t arena[0];
};
bool grpc_error_is_special(grpc_error *err);
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index af33949c69..a2f81bcd78 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -39,6 +39,7 @@
#include <string.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -61,25 +62,30 @@ gpr_mu grpc_polling_mu;
immediately in the next loop iteration.
Note: In the future, if there is a bug that involves missing wakeups in the
future, try adding a uv_async_t to kick the loop differently */
-uv_timer_t dummy_uv_handle;
+uv_timer_t *dummy_uv_handle;
size_t grpc_pollset_size() { return sizeof(grpc_pollset); }
void dummy_timer_cb(uv_timer_t *handle) {}
+void dummy_handle_close_cb(uv_handle_t *handle) { gpr_free(handle); }
+
void grpc_pollset_global_init(void) {
gpr_mu_init(&grpc_polling_mu);
- uv_timer_init(uv_default_loop(), &dummy_uv_handle);
+ dummy_uv_handle = gpr_malloc(sizeof(uv_timer_t));
+ uv_timer_init(uv_default_loop(), dummy_uv_handle);
grpc_pollset_work_run_loop = 1;
}
-static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
-
void grpc_pollset_global_shutdown(void) {
gpr_mu_destroy(&grpc_polling_mu);
- uv_close((uv_handle_t *)&dummy_uv_handle, timer_close_cb);
+ uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb);
}
+static void timer_run_cb(uv_timer_t *timer) {}
+
+static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
+
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &grpc_polling_mu;
uv_timer_init(uv_default_loop(), &pollset->timer);
@@ -95,7 +101,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
} else {
// kick the loop once
- uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
+ uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
}
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
@@ -111,8 +117,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
}
}
-static void timer_run_cb(uv_timer_t *timer) {}
-
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
@@ -145,7 +149,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
- uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
+ uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 79ff910738..4d715be94c 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -40,6 +40,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
@@ -54,8 +55,36 @@ typedef struct request {
grpc_closure *on_done;
grpc_resolved_addresses **addresses;
struct addrinfo *hints;
+ char *host;
+ char *port;
} request;
+static int retry_named_port_failure(int status, request *r,
+ uv_getaddrinfo_cb getaddrinfo_cb) {
+ if (status != 0) {
+ // This loop is copied from resolve_address_posix.c
+ char *svc[][2] = {{"http", "80"}, {"https", "443"}};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
+ if (strcmp(r->port, svc[i][0]) == 0) {
+ int retry_status;
+ uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t));
+ req->data = r;
+ retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb,
+ r->host, svc[i][1], r->hints);
+ if (retry_status < 0 || getaddrinfo_cb == NULL) {
+ // The callback will not be called
+ gpr_free(req);
+ }
+ return retry_status;
+ }
+ }
+ }
+ /* If this function calls uv_getaddrinfo, it will return that function's
+ return value. That function only returns numbers <=0, so we can safely
+ return 1 to indicate that we never retried */
+ return 1;
+}
+
static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result,
grpc_resolved_addresses **addresses) {
struct addrinfo *resp;
@@ -97,13 +126,21 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
request *r = (request *)req->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error;
+ int retry_status;
+
+ gpr_free(req);
+ retry_status = retry_named_port_failure(status, r, getaddrinfo_callback);
+ if (retry_status == 0) {
+ // The request is being retried. Nothing should be done here
+ return;
+ }
+ /* Either no retry was attempted, or the retry failed. Either way, the
+ original error probably has more interesting information */
error = handle_addrinfo_result(status, res, r->addresses);
grpc_closure_sched(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
-
gpr_free(r->hints);
gpr_free(r);
- gpr_free(req);
uv_freeaddrinfo(res);
}
@@ -143,6 +180,7 @@ static grpc_error *blocking_resolve_address_impl(
uv_getaddrinfo_t req;
int s;
grpc_error *err;
+ int retry_status;
req.addrinfo = NULL;
@@ -158,6 +196,12 @@ static grpc_error *blocking_resolve_address_impl(
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
+ request r = {
+ .addresses = addresses, .hints = &hints, .host = host, .port = port};
+ retry_status = retry_named_port_failure(s, &r, NULL);
+ if (retry_status <= 0) {
+ s = retry_status;
+ }
err = handle_addrinfo_result(s, req.addrinfo, addresses);
done:
@@ -200,6 +244,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
r = gpr_malloc(sizeof(request));
r->on_done = on_done;
r->addresses = addrs;
+ r->host = host;
+ r->port = port;
req = gpr_malloc(sizeof(uv_getaddrinfo_t));
req->data = r;
@@ -222,6 +268,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
gpr_free(r);
gpr_free(req);
gpr_free(hints);
+ gpr_free(host);
+ gpr_free(port);
}
}
diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c
index ffa62cb53c..9d2732666b 100644
--- a/src/core/lib/iomgr/sockaddr_utils.c
+++ b/src/core/lib/iomgr/sockaddr_utils.c
@@ -162,6 +162,7 @@ int grpc_sockaddr_to_string(char **out,
char ntop_buf[INET6_ADDRSTRLEN];
const void *ip = NULL;
int port;
+ uint32_t sin6_scope_id = 0;
int ret;
*out = NULL;
@@ -177,10 +178,19 @@ int grpc_sockaddr_to_string(char **out,
const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
+ sin6_scope_id = addr6->sin6_scope_id;
}
if (ip != NULL &&
grpc_inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) {
- ret = gpr_join_host_port(out, ntop_buf, port);
+ if (sin6_scope_id != 0) {
+ char *host_with_scope;
+ /* Enclose sin6_scope_id with the format defined in RFC 6784 section 2. */
+ gpr_asprintf(&host_with_scope, "%s%%25%" PRIu32, ntop_buf, sin6_scope_id);
+ ret = gpr_join_host_port(out, host_with_scope, port);
+ gpr_free(host_with_scope);
+ } else {
+ ret = gpr_join_host_port(out, ntop_buf, port);
+ }
} else {
ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family);
}
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index ae66577caf..618483d9cb 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -76,7 +76,6 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
connect->addr_name, str);
- grpc_error_free_string(str);
}
if (error == GRPC_ERROR_NONE) {
/* error == NONE implies that the timer ran out, and wasn't cancelled. If
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 36f878fdd4..5f286a6723 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -114,6 +114,8 @@ struct grpc_tcp_server {
/* is this server shutting down? */
bool shutdown;
+ /* have listeners been shutdown? */
+ bool shutdown_listeners;
/* use SO_REUSEPORT */
bool so_reuseport;
/* expand wildcard addresses to a list of all local addresses */
@@ -161,7 +163,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server **server) {
gpr_once_init(&check_init, init);
- grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
s->resource_quota = grpc_resource_quota_create(NULL);
s->expand_wildcard_addrs = false;
@@ -422,7 +424,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
return;
default:
- gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
+ gpr_mu_lock(&sp->server->mu);
+ if (!sp->server->shutdown_listeners) {
+ gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
+ } else {
+ /* if we have shutdown listeners, accept4 could fail, and we
+ needn't notify users */
+ }
+ gpr_mu_unlock(&sp->server->mu);
goto error;
}
}
@@ -438,11 +447,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_fd *fdobj = grpc_fd_create(fd, name);
- if (read_notifier_pollset == NULL) {
- gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
- goto error;
- }
-
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
// Create acceptor.
@@ -941,6 +945,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
+ s->shutdown_listeners = true;
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener *sp;
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 2a1c8d39fa..d1bcd89af1 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -485,7 +485,11 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
- s->active_ports++;
+ /* Registered for both read and write callbacks: increment active_ports
+ * twice to account for this, and delay free-ing of memory until both
+ * on_read and on_write have fired. */
+ s->active_ports += 2;
+
sp = sp->next;
}
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h
index 71d32d97ba..c8dd242c75 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.h
+++ b/src/core/lib/iomgr/wakeup_fd_posix.h
@@ -46,7 +46,7 @@
*
* Setup:
* 1. Before calling anything, call global_init() at least once.
- * 1. Call grpc_wakeup_fd_create() to get a wakeup_fd.
+ * 1. Call grpc_wakeup_fd_init() to set up a wakeup_fd.
* 2. Add the result of GRPC_WAKEUP_FD_FD to the set of monitored file
* descriptors for the poll() style API you are using. Monitor the file
* descriptor for readability.
diff --git a/src/core/lib/support/sync.c b/src/core/lib/support/sync.c
index 44b83f8175..b52f004f74 100644
--- a/src/core/lib/support/sync.c
+++ b/src/core/lib/support/sync.c
@@ -37,6 +37,8 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include <assert.h>
+
/* Number of mutexes to allocate for events, to avoid lock contention.
Should be a prime. */
enum { event_sync_partitions = 31 };
@@ -99,8 +101,12 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); }
void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); }
void gpr_ref_non_zero(gpr_refcount *r) {
+#ifndef NDEBUG
gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1);
- GPR_ASSERT(prior > 0);
+ assert(prior > 0);
+#else
+ gpr_ref(r);
+#endif
}
void gpr_refn(gpr_refcount *r, int n) {
@@ -113,6 +119,10 @@ int gpr_unref(gpr_refcount *r) {
return prior == 1;
}
+int gpr_ref_is_unique(gpr_refcount *r) {
+ return gpr_atm_acq_load(&r->count) == 1;
+}
+
void gpr_stats_init(gpr_stats_counter *c, intptr_t n) {
gpr_atm_rel_store(&c->value, n);
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index cc57654ea4..c2547c5147 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -161,6 +161,7 @@ struct grpc_call {
bool receiving_message;
bool requested_final_op;
bool received_final_op;
+ bool sent_any_op;
/* have we received initial metadata */
bool has_initial_md_been_received;
@@ -488,7 +489,7 @@ void grpc_call_destroy(grpc_call *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- cancel = !c->received_final_op;
+ cancel = c->sent_any_op && !c->received_final_op;
gpr_mu_unlock(&c->mu);
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
@@ -1678,6 +1679,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
+ call->sent_any_op = true;
gpr_mu_unlock(&call->mu);
execute_op(exec_ctx, call, stream_op);
diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c
index da77828d9c..ef55e561fb 100644
--- a/src/core/lib/transport/error_utils.c
+++ b/src/core/lib/transport/error_utils.c
@@ -44,12 +44,12 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error,
}
if (grpc_error_is_special(error)) return NULL;
// Otherwise, search through its children.
- intptr_t key = 0;
- while (true) {
- grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
- if (child_error == NULL) break;
- grpc_error *result = recursively_find_error_with_field(child_error, which);
- if (result != NULL) return result;
+ uint8_t slot = error->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot);
+ grpc_error *result = recursively_find_error_with_field(lerr->err, which);
+ if (result) return result;
+ slot = lerr->next;
}
return NULL;
}
@@ -112,13 +112,13 @@ bool grpc_error_has_clear_grpc_status(grpc_error *error) {
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
return true;
}
- intptr_t key = 0;
- while (true) {
- grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
- if (child_error == NULL) break;
- if (grpc_error_has_clear_grpc_status(child_error)) {
+ uint8_t slot = error->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot);
+ if (grpc_error_has_clear_grpc_status(lerr->err)) {
return true;
}
+ slot = lerr->next;
}
return false;
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 004e748f25..165950e288 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -84,6 +84,39 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
}
}
+#define STREAM_REF_FROM_SLICE_REF(p) \
+ ((grpc_stream_refcount *)(((uint8_t *)p) - \
+ offsetof(grpc_stream_refcount, slice_refcount)))
+
+static void slice_stream_ref(void *p) {
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice");
+#else
+ grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p));
+#endif
+}
+
+static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) {
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice");
+#else
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p));
+#endif
+}
+
+grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
+ void *buffer, size_t length) {
+ slice_stream_ref(&refcount->slice_refcount);
+ return (grpc_slice){.refcount = &refcount->slice_refcount,
+ .data.refcounted = {.bytes = buffer, .length = length}};
+}
+
+static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {
+ .ref = slice_stream_ref,
+ .unref = slice_stream_unref,
+ .eq = grpc_slice_default_eq_impl,
+ .hash = grpc_slice_default_hash_impl};
+
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
grpc_iomgr_cb_func cb, void *cb_arg,
@@ -95,6 +128,8 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
#endif
gpr_ref_init(&refcount->refs, initial_refs);
grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
+ refcount->slice_refcount.vtable = &stream_ref_slice_vtable;
+ refcount->slice_refcount.sub_refcount = &refcount->slice_refcount;
}
static void move64(uint64_t *from, uint64_t *to) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index bb23c0225a..cc1c277b35 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -64,6 +64,7 @@ typedef struct grpc_stream_refcount {
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
const char *object_type;
#endif
+ grpc_slice_refcount slice_refcount;
} grpc_stream_refcount;
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
@@ -84,6 +85,11 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
grpc_stream_ref_init(rc, ir, cb, cb_arg)
#endif
+/* Wrap a buffer that is owned by some stream object into a slice that shares
+ the same refcount */
+grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
+ void *buffer, size_t length);
+
typedef struct {
uint64_t framing_bytes;
uint64_t data_bytes;
diff --git a/src/core/lib/tsi/test_creds/BUILD b/src/core/lib/tsi/test_creds/BUILD
index dcd6d930a8..5cf04caf17 100644
--- a/src/core/lib/tsi/test_creds/BUILD
+++ b/src/core/lib/tsi/test_creds/BUILD
@@ -27,6 +27,8 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+licenses(["notice"]) # 3-clause BSD
+
exports_files([
"ca.pem",
"server1.key",