aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-03-17 13:02:03 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2017-03-17 16:00:17 -0700
commit9bc0d8200ec17ababe4c9cd6134a2cb7465365a6 (patch)
tree14ac57b2410205774b345de360b0886033d7d433 /src/core/lib/iomgr
parentcabb1517e5ac3cdbe59415678d4f0d284f9fabda (diff)
parent40a947ef93aeddca3b606613f628d4d09f094e77 (diff)
Merge remote-tracking branch 'upstream/master' into cares_bazel_rule
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/error.c460
-rw-r--r--src/core/lib/iomgr/error.h11
-rw-r--r--src/core/lib/iomgr/error_internal.h39
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c3
-rw-r--r--src/core/lib/iomgr/pollset.h1
-rw-r--r--src/core/lib/iomgr/pollset_uv.c31
-rw-r--r--src/core/lib/iomgr/pollset_windows.c1
-rw-r--r--src/core/lib/iomgr/port.h4
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c52
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.c12
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c415
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix.h134
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.c220
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c195
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c49
-rw-r--r--src/core/lib/iomgr/timer_generic.c9
-rw-r--r--src/core/lib/iomgr/udp_server.c22
-rw-r--r--src/core/lib/iomgr/udp_server.h12
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.h2
20 files changed, 1083 insertions, 593 deletions
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index dbe5b139f9..1127fff756 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -35,6 +35,7 @@
#include <string.h>
+#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -47,46 +48,7 @@
#include "src/core/lib/iomgr/error_internal.h"
#include "src/core/lib/profiling/timers.h"
-
-static void destroy_integer(void *key) {}
-
-static void *copy_integer(void *key) { return key; }
-
-static long compare_integers(void *key1, void *key2) {
- return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2);
-}
-
-static void destroy_string(void *str) { gpr_free(str); }
-
-static void *copy_string(void *str) { return gpr_strdup(str); }
-
-static void destroy_err(void *err) { GRPC_ERROR_UNREF(err); }
-
-static void *copy_err(void *err) { return GRPC_ERROR_REF(err); }
-
-static void destroy_time(void *tm) { gpr_free(tm); }
-
-static gpr_timespec *box_time(gpr_timespec tm) {
- gpr_timespec *out = gpr_malloc(sizeof(*out));
- *out = tm;
- return out;
-}
-
-static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); }
-
-static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer,
- compare_integers,
- destroy_integer, copy_integer};
-
-static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer,
- compare_integers, destroy_string,
- copy_string};
-
-static const gpr_avl_vtable avl_vtable_times = {
- destroy_integer, copy_integer, compare_integers, destroy_time, copy_time};
-
-static const gpr_avl_vtable avl_vtable_errs = {
- destroy_integer, copy_integer, compare_integers, destroy_err, copy_err};
+#include "src/core/lib/slice/slice_internal.h"
static const char *error_int_name(grpc_error_ints key) {
switch (key) {
@@ -120,6 +82,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "limit";
case GRPC_ERROR_INT_OCCURRED_DURING_WRITE:
return "occurred_during_write";
+ case GRPC_ERROR_INT_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -150,6 +114,8 @@ static const char *error_str_name(grpc_error_strs key) {
return "filename";
case GRPC_ERROR_STR_QUEUED_BUFFERS:
return "queued_buffers";
+ case GRPC_ERROR_STR_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -158,6 +124,8 @@ static const char *error_time_name(grpc_error_times key) {
switch (key) {
case GRPC_ERROR_TIME_CREATED:
return "created";
+ case GRPC_ERROR_TIME_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -172,25 +140,51 @@ grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line,
const char *func) {
if (grpc_error_is_special(err)) return err;
gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d %s]", err,
- err->refs.count, err->refs.count + 1, file, line, func);
- gpr_ref(&err->refs);
+ gpr_atm_no_barrier_load(&err->atomics.refs.count),
+ gpr_atm_no_barrier_load(&err->atomics.refs.count) + 1, file, line,
+ func);
+ gpr_ref(&err->atomics.refs);
return err;
}
#else
grpc_error *grpc_error_ref(grpc_error *err) {
if (grpc_error_is_special(err)) return err;
- gpr_ref(&err->refs);
+ gpr_ref(&err->atomics.refs);
return err;
}
#endif
+static void unref_errs(grpc_error *err) {
+ uint8_t slot = err->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ GRPC_ERROR_UNREF(lerr->err);
+ GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX
+ : lerr->next != UINT8_MAX);
+ slot = lerr->next;
+ }
+}
+
+static void unref_slice(grpc_slice slice) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_unref_internal(&exec_ctx, slice);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void unref_strs(grpc_error *err) {
+ for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) {
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ unref_slice(*(grpc_slice *)(err->arena + slot));
+ }
+ }
+}
+
static void error_destroy(grpc_error *err) {
GPR_ASSERT(!grpc_error_is_special(err));
- gpr_avl_unref(err->ints);
- gpr_avl_unref(err->strs);
- gpr_avl_unref(err->errs);
- gpr_avl_unref(err->times);
- gpr_free((void *)gpr_atm_acq_load(&err->error_string));
+ unref_errs(err);
+ unref_strs(err);
+ gpr_free((void *)gpr_atm_acq_load(&err->atomics.error_string));
gpr_free(err);
}
@@ -199,81 +193,209 @@ void grpc_error_unref(grpc_error *err, const char *file, int line,
const char *func) {
if (grpc_error_is_special(err)) return;
gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d %s]", err,
- err->refs.count, err->refs.count - 1, file, line, func);
- if (gpr_unref(&err->refs)) {
+ gpr_atm_no_barrier_load(&err->atomics.refs.count),
+ gpr_atm_no_barrier_load(&err->atomics.refs.count) - 1, file, line,
+ func);
+ if (gpr_unref(&err->atomics.refs)) {
error_destroy(err);
}
}
#else
void grpc_error_unref(grpc_error *err) {
if (grpc_error_is_special(err)) return;
- if (gpr_unref(&err->refs)) {
+ if (gpr_unref(&err->atomics.refs)) {
error_destroy(err);
}
}
#endif
+static uint8_t get_placement(grpc_error **err, size_t size) {
+ GPR_ASSERT(*err);
+ uint8_t slots = (uint8_t)(size / sizeof(intptr_t));
+ if ((*err)->arena_size + slots > (*err)->arena_capacity) {
+ (*err)->arena_capacity = (uint8_t)(3 * (*err)->arena_capacity / 2);
+ *err = gpr_realloc(
+ *err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t));
+ }
+ uint8_t placement = (*err)->arena_size;
+ (*err)->arena_size = (uint8_t)((*err)->arena_size + slots);
+ return placement;
+}
+
+static void internal_set_int(grpc_error **err, grpc_error_ints which,
+ intptr_t value) {
+ // GPR_ASSERT((*err)->ints[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->ints[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ }
+ (*err)->ints[which] = slot;
+ (*err)->arena[slot] = value;
+}
+
+static void internal_set_str(grpc_error **err, grpc_error_strs which,
+ grpc_slice value) {
+ // GPR_ASSERT((*err)->strs[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->strs[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ } else {
+ unref_slice(*(grpc_slice *)((*err)->arena + slot));
+ }
+ (*err)->strs[which] = slot;
+ memcpy((*err)->arena + slot, &value, sizeof(value));
+}
+
+static void internal_set_time(grpc_error **err, grpc_error_times which,
+ gpr_timespec value) {
+ // GPR_ASSERT((*err)->times[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->times[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ }
+ (*err)->times[which] = slot;
+ memcpy((*err)->arena + slot, &value, sizeof(value));
+}
+
+static void internal_add_error(grpc_error **err, grpc_error *new) {
+ grpc_linked_error new_last = {new, UINT8_MAX};
+ uint8_t slot = get_placement(err, sizeof(grpc_linked_error));
+ if ((*err)->first_err == UINT8_MAX) {
+ GPR_ASSERT((*err)->last_err == UINT8_MAX);
+ (*err)->last_err = slot;
+ (*err)->first_err = slot;
+ } else {
+ GPR_ASSERT((*err)->last_err != UINT8_MAX);
+ grpc_linked_error *old_last =
+ (grpc_linked_error *)((*err)->arena + (*err)->last_err);
+ old_last->next = slot;
+ (*err)->last_err = slot;
+ }
+ memcpy((*err)->arena + slot, &new_last, sizeof(grpc_linked_error));
+}
+
+#define SLOTS_PER_INT (sizeof(intptr_t) / sizeof(intptr_t))
+#define SLOTS_PER_STR (sizeof(grpc_slice) / sizeof(intptr_t))
+#define SLOTS_PER_TIME (sizeof(gpr_timespec) / sizeof(intptr_t))
+#define SLOTS_PER_LINKED_ERROR (sizeof(grpc_linked_error) / sizeof(intptr_t))
+
+// size of storing one int and two slices and a timespec. For line, desc, file,
+// and time created
+#define DEFAULT_ERROR_CAPACITY \
+ (SLOTS_PER_INT + (SLOTS_PER_STR * 2) + SLOTS_PER_TIME)
+
+// It is very common to include and extra int and string in an error
+#define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME)
+
grpc_error *grpc_error_create(const char *file, int line, const char *desc,
grpc_error **referencing,
size_t num_referencing) {
GPR_TIMER_BEGIN("grpc_error_create", 0);
- grpc_error *err = gpr_malloc(sizeof(*err));
+ uint8_t initial_arena_capacity = (uint8_t)(
+ DEFAULT_ERROR_CAPACITY +
+ (uint8_t)(num_referencing * SLOTS_PER_LINKED_ERROR) + SURPLUS_CAPACITY);
+ grpc_error *err =
+ gpr_malloc(sizeof(*err) + initial_arena_capacity * sizeof(intptr_t));
if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL
return GRPC_ERROR_OOM;
}
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line);
#endif
- err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints),
- (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE,
- (void *)(uintptr_t)line);
- err->strs = gpr_avl_add(
- gpr_avl_add(gpr_avl_create(&avl_vtable_strs),
- (void *)(uintptr_t)GRPC_ERROR_STR_FILE, gpr_strdup(file)),
- (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, gpr_strdup(desc));
- err->errs = gpr_avl_create(&avl_vtable_errs);
- err->next_err = 0;
- for (size_t i = 0; i < num_referencing; i++) {
+
+ err->arena_size = 0;
+ err->arena_capacity = initial_arena_capacity;
+ err->first_err = UINT8_MAX;
+ err->last_err = UINT8_MAX;
+
+ memset(err->ints, UINT8_MAX, GRPC_ERROR_INT_MAX);
+ memset(err->strs, UINT8_MAX, GRPC_ERROR_STR_MAX);
+ memset(err->times, UINT8_MAX, GRPC_ERROR_TIME_MAX);
+
+ internal_set_int(&err, GRPC_ERROR_INT_FILE_LINE, line);
+ internal_set_str(&err, GRPC_ERROR_STR_FILE,
+ grpc_slice_from_static_string(file));
+ internal_set_str(
+ &err, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_copied_buffer(
+ desc,
+ strlen(desc) +
+ 1)); // TODO, pull this up. // TODO(ncteisen), pull this up.
+
+ for (size_t i = 0; i < num_referencing; ++i) {
if (referencing[i] == GRPC_ERROR_NONE) continue;
- err->errs = gpr_avl_add(err->errs, (void *)(err->next_err++),
- GRPC_ERROR_REF(referencing[i]));
- }
- err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times),
- (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED,
- box_time(gpr_now(GPR_CLOCK_REALTIME)));
- gpr_atm_no_barrier_store(&err->error_string, 0);
- gpr_ref_init(&err->refs, 1);
+ internal_add_error(
+ &err,
+ GRPC_ERROR_REF(
+ referencing[i])); // TODO(ncteisen), change ownership semantics
+ }
+
+ internal_set_time(&err, GRPC_ERROR_TIME_CREATED, gpr_now(GPR_CLOCK_REALTIME));
+
+ gpr_atm_no_barrier_store(&err->atomics.error_string, 0);
+ gpr_ref_init(&err->atomics.refs, 1);
GPR_TIMER_END("grpc_error_create", 0);
return err;
}
+static void ref_strs(grpc_error *err) {
+ for (size_t i = 0; i < GRPC_ERROR_STR_MAX; ++i) {
+ uint8_t slot = err->strs[i];
+ if (slot != UINT8_MAX) {
+ grpc_slice_ref_internal(*(grpc_slice *)(err->arena + slot));
+ }
+ }
+}
+
+static void ref_errs(grpc_error *err) {
+ uint8_t slot = err->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ GRPC_ERROR_REF(lerr->err);
+ slot = lerr->next;
+ }
+}
+
static grpc_error *copy_error_and_unref(grpc_error *in) {
GPR_TIMER_BEGIN("copy_error_and_unref", 0);
grpc_error *out;
if (grpc_error_is_special(in)) {
- if (in == GRPC_ERROR_NONE)
- out = grpc_error_set_int(GRPC_ERROR_CREATE("no error"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK);
- else if (in == GRPC_ERROR_OOM)
- out = GRPC_ERROR_CREATE("oom");
- else if (in == GRPC_ERROR_CANCELLED)
- out =
- grpc_error_set_int(GRPC_ERROR_CREATE("cancelled"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
- else
- out = GRPC_ERROR_CREATE("unknown");
+ out = GRPC_ERROR_CREATE("unknown");
+ if (in == GRPC_ERROR_NONE) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("no error"));
+ internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK);
+ } else if (in == GRPC_ERROR_OOM) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("oom"));
+ } else if (in == GRPC_ERROR_CANCELLED) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("cancelled"));
+ internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
+ }
+ } else if (gpr_ref_is_unique(&in->atomics.refs)) {
+ out = in;
} else {
- out = gpr_malloc(sizeof(*out));
+ uint8_t new_arena_capacity = in->arena_capacity;
+ // the returned err will be added to, so we ensure this is room to avoid
+ // unneeded allocations.
+ if (in->arena_capacity - in->arena_size < (uint8_t)SLOTS_PER_STR) {
+ new_arena_capacity = (uint8_t)(3 * new_arena_capacity / 2);
+ }
+ out = gpr_malloc(sizeof(*in) + new_arena_capacity * sizeof(intptr_t));
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
#endif
- out->ints = gpr_avl_ref(in->ints);
- out->strs = gpr_avl_ref(in->strs);
- out->errs = gpr_avl_ref(in->errs);
- out->times = gpr_avl_ref(in->times);
- gpr_atm_no_barrier_store(&out->error_string, 0);
- out->next_err = in->next_err;
- gpr_ref_init(&out->refs, 1);
+ // bulk memcpy of the rest of the struct.
+ size_t skip = sizeof(&out->atomics);
+ memcpy((void *)((uintptr_t)out + skip), (void *)((uintptr_t)in + skip),
+ sizeof(*in) + (in->arena_size * sizeof(intptr_t)) - skip);
+ // manually set the atomics and the new capacity
+ gpr_atm_no_barrier_store(&out->atomics.error_string, 0);
+ gpr_ref_init(&out->atomics.refs, 1);
+ out->arena_capacity = new_arena_capacity;
+ ref_strs(out);
+ ref_errs(out);
GRPC_ERROR_UNREF(in);
}
GPR_TIMER_END("copy_error_and_unref", 0);
@@ -284,7 +406,7 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value) {
GPR_TIMER_BEGIN("grpc_error_set_int", 0);
grpc_error *new = copy_error_and_unref(src);
- new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value);
+ internal_set_int(&new, which, value);
GPR_TIMER_END("grpc_error_set_int", 0);
return new;
}
@@ -302,7 +424,6 @@ static special_error_status_map error_status_map[] = {
bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
GPR_TIMER_BEGIN("grpc_error_get_int", 0);
- void *pp;
if (grpc_error_is_special(err)) {
if (which == GRPC_ERROR_INT_GRPC_STATUS) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); i++) {
@@ -316,8 +437,9 @@ bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
GPR_TIMER_END("grpc_error_get_int", 0);
return false;
}
- if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) {
- if (p != NULL) *p = (intptr_t)pp;
+ uint8_t slot = err->ints[which];
+ if (slot != UINT8_MAX) {
+ if (p != NULL) *p = err->arena[slot];
GPR_TIMER_END("grpc_error_get_int", 0);
return true;
}
@@ -329,8 +451,9 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) {
GPR_TIMER_BEGIN("grpc_error_set_str", 0);
grpc_error *new = copy_error_and_unref(src);
- new->strs =
- gpr_avl_add(new->strs, (void *)(uintptr_t)which, gpr_strdup(value));
+ internal_set_str(&new, which,
+ grpc_slice_from_copied_buffer(
+ value, strlen(value) + 1)); // TODO, pull this up.
GPR_TIMER_END("grpc_error_set_str", 0);
return new;
}
@@ -346,13 +469,19 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) {
}
return NULL;
}
- return gpr_avl_get(err->strs, (void *)(uintptr_t)which);
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ return (const char *)GRPC_SLICE_START_PTR(
+ *(grpc_slice *)(err->arena + slot));
+ } else {
+ return NULL;
+ }
}
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
GPR_TIMER_BEGIN("grpc_error_add_child", 0);
grpc_error *new = copy_error_and_unref(src);
- new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child);
+ internal_add_error(&new, child);
GPR_TIMER_END("grpc_error_add_child", 0);
return new;
}
@@ -372,42 +501,6 @@ typedef struct {
size_t cap_kvs;
} kv_pairs;
-static void append_kv(kv_pairs *kvs, char *key, char *value) {
- if (kvs->num_kvs == kvs->cap_kvs) {
- kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4);
- kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs);
- }
- kvs->kvs[kvs->num_kvs].key = key;
- kvs->kvs[kvs->num_kvs].value = value;
- kvs->num_kvs++;
-}
-
-static void collect_kvs(gpr_avl_node *node, char *key(void *k),
- char *fmt(void *v), kv_pairs *kvs) {
- if (node == NULL) return;
- append_kv(kvs, key(node->key), fmt(node->value));
- collect_kvs(node->left, key, fmt, kvs);
- collect_kvs(node->right, key, fmt, kvs);
-}
-
-static char *key_int(void *p) {
- return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p));
-}
-
-static char *key_str(void *p) {
- return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p));
-}
-
-static char *key_time(void *p) {
- return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p));
-}
-
-static char *fmt_int(void *p) {
- char *s;
- gpr_asprintf(&s, "%" PRIdPTR, (intptr_t)p);
- return s;
-}
-
static void append_chr(char c, char **s, size_t *sz, size_t *cap) {
if (*sz == *cap) {
*cap = GPR_MAX(8, 3 * *cap / 2);
@@ -459,6 +552,40 @@ static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) {
append_chr('"', s, sz, cap);
}
+static void append_kv(kv_pairs *kvs, char *key, char *value) {
+ if (kvs->num_kvs == kvs->cap_kvs) {
+ kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4);
+ kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs);
+ }
+ kvs->kvs[kvs->num_kvs].key = key;
+ kvs->kvs[kvs->num_kvs].value = value;
+ kvs->num_kvs++;
+}
+
+static char *key_int(grpc_error_ints which) {
+ return gpr_strdup(error_int_name(which));
+}
+
+static char *fmt_int(intptr_t p) {
+ char *s;
+ gpr_asprintf(&s, "%" PRIdPTR, p);
+ return s;
+}
+
+static void collect_ints_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_INT_MAX; ++which) {
+ uint8_t slot = err->ints[which];
+ if (slot != UINT8_MAX) {
+ append_kv(kvs, key_int((grpc_error_ints)which),
+ fmt_int(err->arena[slot]));
+ }
+ }
+}
+
+static char *key_str(grpc_error_strs which) {
+ return gpr_strdup(error_str_name(which));
+}
+
static char *fmt_str(void *p) {
char *s = NULL;
size_t sz = 0;
@@ -468,8 +595,22 @@ static char *fmt_str(void *p) {
return s;
}
-static char *fmt_time(void *p) {
- gpr_timespec tm = *(gpr_timespec *)p;
+static void collect_strs_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) {
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ append_kv(
+ kvs, key_str((grpc_error_strs)which),
+ fmt_str(GRPC_SLICE_START_PTR(*(grpc_slice *)(err->arena + slot))));
+ }
+ }
+}
+
+static char *key_time(grpc_error_times which) {
+ return gpr_strdup(error_time_name(which));
+}
+
+static char *fmt_time(gpr_timespec tm) {
char *out;
char *pfx = "!!";
switch (tm.clock_type) {
@@ -490,24 +631,37 @@ static char *fmt_time(void *p) {
return out;
}
-static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap,
- bool *first) {
- if (n == NULL) return;
- add_errs(n->left, s, sz, cap, first);
- if (!*first) append_chr(',', s, sz, cap);
- *first = false;
- const char *e = grpc_error_string(n->value);
- append_str(e, s, sz, cap);
- add_errs(n->right, s, sz, cap, first);
+static void collect_times_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_TIME_MAX; ++which) {
+ uint8_t slot = err->times[which];
+ if (slot != UINT8_MAX) {
+ append_kv(kvs, key_time((grpc_error_times)which),
+ fmt_time(*(gpr_timespec *)(err->arena + slot)));
+ }
+ }
+}
+
+static void add_errs(grpc_error *err, char **s, size_t *sz, size_t *cap) {
+ uint8_t slot = err->first_err;
+ bool first = true;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ if (!first) append_chr(',', s, sz, cap);
+ first = false;
+ const char *e = grpc_error_string(lerr->err);
+ append_str(e, s, sz, cap);
+ GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX
+ : lerr->next != UINT8_MAX);
+ slot = lerr->next;
+ }
}
static char *errs_string(grpc_error *err) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
- bool first = true;
append_chr('[', &s, &sz, &cap);
- add_errs(err->errs.root, &s, &sz, &cap, &first);
+ add_errs(err, &s, &sz, &cap);
append_chr(']', &s, &sz, &cap);
append_chr(0, &s, &sz, &cap);
return s;
@@ -546,7 +700,7 @@ const char *grpc_error_string(grpc_error *err) {
if (err == GRPC_ERROR_OOM) return oom_error_string;
if (err == GRPC_ERROR_CANCELLED) return cancelled_error_string;
- void *p = (void *)gpr_atm_acq_load(&err->error_string);
+ void *p = (void *)gpr_atm_acq_load(&err->atomics.error_string);
if (p != NULL) {
GPR_TIMER_END("grpc_error_string", 0);
return p;
@@ -555,10 +709,10 @@ const char *grpc_error_string(grpc_error *err) {
kv_pairs kvs;
memset(&kvs, 0, sizeof(kvs));
- collect_kvs(err->ints.root, key_int, fmt_int, &kvs);
- collect_kvs(err->strs.root, key_str, fmt_str, &kvs);
- collect_kvs(err->times.root, key_time, fmt_time, &kvs);
- if (!gpr_avl_is_empty(err->errs)) {
+ collect_ints_kvs(err, &kvs);
+ collect_strs_kvs(err, &kvs);
+ collect_times_kvs(err, &kvs);
+ if (err->first_err != UINT8_MAX) {
append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err));
}
@@ -566,9 +720,9 @@ const char *grpc_error_string(grpc_error *err) {
char *out = finish_kvs(&kvs);
- if (!gpr_atm_rel_cas(&err->error_string, 0, (gpr_atm)out)) {
+ if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) {
gpr_free(out);
- out = (char *)gpr_atm_no_barrier_load(&err->error_string);
+ out = (char *)gpr_atm_no_barrier_load(&err->atomics.error_string);
}
GPR_TIMER_END("grpc_error_string", 0);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 2613512acb..eb953947ae 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -102,6 +102,9 @@ typedef enum {
GRPC_ERROR_INT_LIMIT,
/// chttp2: did the error occur while a write was in progress
GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
+
+ /// Must always be last
+ GRPC_ERROR_INT_MAX,
} grpc_error_ints;
typedef enum {
@@ -129,11 +132,17 @@ typedef enum {
GRPC_ERROR_STR_KEY,
/// value associated with the error
GRPC_ERROR_STR_VALUE,
+
+ /// Must always be last
+ GRPC_ERROR_STR_MAX,
} grpc_error_strs;
typedef enum {
/// timestamp of error creation
GRPC_ERROR_TIME_CREATED,
+
+ /// Must always be last
+ GRPC_ERROR_TIME_MAX,
} grpc_error_times;
/// The following "special" errors can be propagated without allocating memory.
@@ -184,8 +193,6 @@ void grpc_error_unref(grpc_error *err);
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value) GRPC_MUST_USE_RESULT;
bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p);
-grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
- gpr_timespec value) GRPC_MUST_USE_RESULT;
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) GRPC_MUST_USE_RESULT;
/// Returns NULL if the specified string is not set.
diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h
index 1c89ead4ed..7f204df1b2 100644
--- a/src/core/lib/iomgr/error_internal.h
+++ b/src/core/lib/iomgr/error_internal.h
@@ -35,18 +35,39 @@
#define GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H
#include <inttypes.h>
-#include <stdbool.h>
+#include <stdbool.h> // TODO, do we need this?
-#include <grpc/support/avl.h>
+#include <grpc/support/sync.h>
+typedef struct grpc_linked_error grpc_linked_error;
+
+struct grpc_linked_error {
+ grpc_error *err;
+ uint8_t next;
+};
+
+// c core representation of an error. See error.h for high level description of
+// this object.
struct grpc_error {
- gpr_refcount refs;
- gpr_avl ints;
- gpr_avl strs;
- gpr_avl times;
- gpr_avl errs;
- uintptr_t next_err;
- gpr_atm error_string;
+ // All atomics in grpc_error must be stored in this nested struct. The rest of
+ // the object is memcpy-ed in bulk in copy_and_unref.
+ struct atomics {
+ gpr_refcount refs;
+ gpr_atm error_string;
+ } atomics;
+ // These arrays index into dynamic arena at the bottom of the struct.
+ // UINT8_MAX is used as a sentinel value.
+ uint8_t ints[GRPC_ERROR_INT_MAX];
+ uint8_t strs[GRPC_ERROR_STR_MAX];
+ uint8_t times[GRPC_ERROR_TIME_MAX];
+ // The child errors are stored in the arena, but are effectively a linked list
+ // structure, since they are contained withing grpc_linked_error objects.
+ uint8_t first_err;
+ uint8_t last_err;
+ // The arena is dynamically reallocated with a grow factor of 1.5.
+ uint8_t arena_size;
+ uint8_t arena_capacity;
+ intptr_t arena[0];
};
bool grpc_error_is_special(grpc_error *err);
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index c03fadaebb..5ddd5313e2 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -1131,8 +1131,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
*/
static grpc_pollset_set *pollset_set_create(void) {
- grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
- memset(pollset_set, 0, sizeof(*pollset_set));
+ grpc_pollset_set *pollset_set = gpr_zalloc(sizeof(*pollset_set));
gpr_mu_init(&pollset_set->mu);
return pollset_set;
}
diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h
index c4b62a2790..e19ce697b8 100644
--- a/src/core/lib/iomgr/pollset.h
+++ b/src/core/lib/iomgr/pollset.h
@@ -53,6 +53,7 @@ typedef struct grpc_pollset grpc_pollset;
typedef struct grpc_pollset_worker grpc_pollset_worker;
size_t grpc_pollset_size(void);
+/* Initialize a pollset: assumes *pollset contains all zeros */
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu);
/* Begin shutting down the pollset, and call closure when done.
* pollset's mutex must be held */
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 5847771108..a2f81bcd78 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -39,6 +39,7 @@
#include <string.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -57,24 +58,40 @@ int grpc_pollset_work_run_loop;
gpr_mu grpc_polling_mu;
+/* This is used solely to kick the uv loop, by setting a callback to be run
+ immediately in the next loop iteration.
+ Note: In the future, if there is a bug that involves missing wakeups in the
+ future, try adding a uv_async_t to kick the loop differently */
+uv_timer_t *dummy_uv_handle;
+
size_t grpc_pollset_size() { return sizeof(grpc_pollset); }
+void dummy_timer_cb(uv_timer_t *handle) {}
+
+void dummy_handle_close_cb(uv_handle_t *handle) { gpr_free(handle); }
+
void grpc_pollset_global_init(void) {
gpr_mu_init(&grpc_polling_mu);
+ dummy_uv_handle = gpr_malloc(sizeof(uv_timer_t));
+ uv_timer_init(uv_default_loop(), dummy_uv_handle);
grpc_pollset_work_run_loop = 1;
}
-void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
+void grpc_pollset_global_shutdown(void) {
+ gpr_mu_destroy(&grpc_polling_mu);
+ uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb);
+}
+
+static void timer_run_cb(uv_timer_t *timer) {}
+
+static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &grpc_polling_mu;
- memset(pollset, 0, sizeof(grpc_pollset));
uv_timer_init(uv_default_loop(), &pollset->timer);
pollset->shutting_down = 0;
}
-static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
-
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(!pollset->shutting_down);
@@ -82,6 +99,9 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (grpc_pollset_work_run_loop) {
// Drain any pending UV callbacks without blocking
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ } else {
+ // kick the loop once
+ uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
}
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
@@ -97,8 +117,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
}
}
-static void timer_run_cb(uv_timer_t *timer) {}
-
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
@@ -131,6 +149,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index 9be73a7ce8..17043c1ea1 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -98,7 +98,6 @@ size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &grpc_polling_mu;
- memset(pollset, 0, sizeof(*pollset));
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
&pollset->root_worker;
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index f1897bb91f..94a454c0b7 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -39,6 +39,7 @@
#if defined(GRPC_UV)
// Do nothing
#elif defined(GPR_MANYLINUX1)
+#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
@@ -65,6 +66,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_LINUX)
+#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
@@ -90,6 +92,7 @@
#define GRPC_POSIX_SOCKETUTILS
#endif
#elif defined(GPR_APPLE)
+#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_MSG_IOVLEN_TYPE int
@@ -100,6 +103,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_FREEBSD)
+#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 79ff910738..4d715be94c 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -40,6 +40,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
@@ -54,8 +55,36 @@ typedef struct request {
grpc_closure *on_done;
grpc_resolved_addresses **addresses;
struct addrinfo *hints;
+ char *host;
+ char *port;
} request;
+static int retry_named_port_failure(int status, request *r,
+ uv_getaddrinfo_cb getaddrinfo_cb) {
+ if (status != 0) {
+ // This loop is copied from resolve_address_posix.c
+ char *svc[][2] = {{"http", "80"}, {"https", "443"}};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
+ if (strcmp(r->port, svc[i][0]) == 0) {
+ int retry_status;
+ uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t));
+ req->data = r;
+ retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb,
+ r->host, svc[i][1], r->hints);
+ if (retry_status < 0 || getaddrinfo_cb == NULL) {
+ // The callback will not be called
+ gpr_free(req);
+ }
+ return retry_status;
+ }
+ }
+ }
+ /* If this function calls uv_getaddrinfo, it will return that function's
+ return value. That function only returns numbers <=0, so we can safely
+ return 1 to indicate that we never retried */
+ return 1;
+}
+
static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result,
grpc_resolved_addresses **addresses) {
struct addrinfo *resp;
@@ -97,13 +126,21 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
request *r = (request *)req->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error;
+ int retry_status;
+
+ gpr_free(req);
+ retry_status = retry_named_port_failure(status, r, getaddrinfo_callback);
+ if (retry_status == 0) {
+ // The request is being retried. Nothing should be done here
+ return;
+ }
+ /* Either no retry was attempted, or the retry failed. Either way, the
+ original error probably has more interesting information */
error = handle_addrinfo_result(status, res, r->addresses);
grpc_closure_sched(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
-
gpr_free(r->hints);
gpr_free(r);
- gpr_free(req);
uv_freeaddrinfo(res);
}
@@ -143,6 +180,7 @@ static grpc_error *blocking_resolve_address_impl(
uv_getaddrinfo_t req;
int s;
grpc_error *err;
+ int retry_status;
req.addrinfo = NULL;
@@ -158,6 +196,12 @@ static grpc_error *blocking_resolve_address_impl(
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
+ request r = {
+ .addresses = addresses, .hints = &hints, .host = host, .port = port};
+ retry_status = retry_named_port_failure(s, &r, NULL);
+ if (retry_status <= 0) {
+ s = retry_status;
+ }
err = handle_addrinfo_result(s, req.addrinfo, addresses);
done:
@@ -200,6 +244,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
r = gpr_malloc(sizeof(request));
r->on_done = on_done;
r->addresses = addrs;
+ r->host = host;
+ r->port = port;
req = gpr_malloc(sizeof(uv_getaddrinfo_t));
req->data = r;
@@ -222,6 +268,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
gpr_free(r);
gpr_free(req);
gpr_free(hints);
+ gpr_free(host);
+ gpr_free(port);
}
}
diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c
index ffa62cb53c..9d2732666b 100644
--- a/src/core/lib/iomgr/sockaddr_utils.c
+++ b/src/core/lib/iomgr/sockaddr_utils.c
@@ -162,6 +162,7 @@ int grpc_sockaddr_to_string(char **out,
char ntop_buf[INET6_ADDRSTRLEN];
const void *ip = NULL;
int port;
+ uint32_t sin6_scope_id = 0;
int ret;
*out = NULL;
@@ -177,10 +178,19 @@ int grpc_sockaddr_to_string(char **out,
const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
+ sin6_scope_id = addr6->sin6_scope_id;
}
if (ip != NULL &&
grpc_inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) {
- ret = gpr_join_host_port(out, ntop_buf, port);
+ if (sin6_scope_id != 0) {
+ char *host_with_scope;
+ /* Enclose sin6_scope_id with the format defined in RFC 6784 section 2. */
+ gpr_asprintf(&host_with_scope, "%s%%25%" PRIu32, ntop_buf, sin6_scope_id);
+ ret = gpr_join_host_port(out, host_with_scope, port);
+ gpr_free(host_with_scope);
+ } else {
+ ret = gpr_join_host_port(out, ntop_buf, port);
+ }
} else {
ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family);
}
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index 3de0795187..618483d9cb 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -76,7 +76,6 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
connect->addr_name, str);
- grpc_error_free_string(str);
}
if (error == GRPC_ERROR_NONE) {
/* error == NONE implies that the timer ran out, and wasn't cancelled. If
@@ -144,8 +143,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}
}
- connect = gpr_malloc(sizeof(grpc_uv_tcp_connect));
- memset(connect, 0, sizeof(grpc_uv_tcp_connect));
+ connect = gpr_zalloc(sizeof(grpc_uv_tcp_connect));
connect->closure = closure;
connect->endpoint = ep;
connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t));
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 36f878fdd4..b3644518f5 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -44,11 +44,8 @@
#include <errno.h>
#include <fcntl.h>
-#include <ifaddrs.h>
-#include <limits.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
-#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
@@ -67,80 +64,10 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
+#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
-#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
-
-static gpr_once s_init_max_accept_queue_size;
-static int s_max_accept_queue_size;
-
-/* one listening port */
-typedef struct grpc_tcp_listener grpc_tcp_listener;
-struct grpc_tcp_listener {
- int fd;
- grpc_fd *emfd;
- grpc_tcp_server *server;
- grpc_resolved_address addr;
- int port;
- unsigned port_index;
- unsigned fd_index;
- grpc_closure read_closure;
- grpc_closure destroyed_closure;
- struct grpc_tcp_listener *next;
- /* sibling is a linked list of all listeners for a given port. add_port and
- clone_port place all new listeners in the same sibling list. A member of
- the 'sibling' list is also a member of the 'next' list. The head of each
- sibling list has is_sibling==0, and subsequent members of sibling lists
- have is_sibling==1. is_sibling allows separate sibling lists to be
- identified while iterating through 'next'. */
- struct grpc_tcp_listener *sibling;
- int is_sibling;
-};
-
-/* the overall server */
-struct grpc_tcp_server {
- gpr_refcount refs;
- /* Called whenever accept() succeeds on a server port. */
- grpc_tcp_server_cb on_accept_cb;
- void *on_accept_cb_arg;
-
- gpr_mu mu;
-
- /* active port count: how many ports are actually still listening */
- size_t active_ports;
- /* destroyed port count: how many ports are completely destroyed */
- size_t destroyed_ports;
-
- /* is this server shutting down? */
- bool shutdown;
- /* use SO_REUSEPORT */
- bool so_reuseport;
- /* expand wildcard addresses to a list of all local addresses */
- bool expand_wildcard_addrs;
-
- /* linked list of server ports */
- grpc_tcp_listener *head;
- grpc_tcp_listener *tail;
- unsigned nports;
-
- /* List of closures passed to shutdown_starting_add(). */
- grpc_closure_list shutdown_starting;
-
- /* shutdown callback */
- grpc_closure *shutdown_complete;
-
- /* all pollsets interested in new connections */
- grpc_pollset **pollsets;
- /* number of pollsets in the pollsets array */
- size_t pollset_count;
-
- /* next pollset to assign a channel to */
- gpr_atm next_pollset_to_assign;
-
- grpc_resource_quota *resource_quota;
-};
-
static gpr_once check_init = GPR_ONCE_INIT;
static bool has_so_reuseport = false;
@@ -161,7 +88,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server **server) {
gpr_once_init(&check_init, init);
- grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
s->resource_quota = grpc_resource_quota_create(NULL);
s->expand_wildcard_addrs = false;
@@ -299,99 +226,6 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}
-/* get max listen queue size on linux */
-static void init_max_accept_queue_size(void) {
- int n = SOMAXCONN;
- char buf[64];
- FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r");
- if (fp == NULL) {
- /* 2.4 kernel. */
- s_max_accept_queue_size = SOMAXCONN;
- return;
- }
- if (fgets(buf, sizeof buf, fp)) {
- char *end;
- long i = strtol(buf, &end, 10);
- if (i > 0 && i <= INT_MAX && end && *end == 0) {
- n = (int)i;
- }
- }
- fclose(fp);
- s_max_accept_queue_size = n;
-
- if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
- gpr_log(GPR_INFO,
- "Suspiciously small accept queue (%d) will probably lead to "
- "connection drops",
- s_max_accept_queue_size);
- }
-}
-
-static int get_max_accept_queue_size(void) {
- gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
- return s_max_accept_queue_size;
-}
-
-/* Prepare a recently-created socket for listening. */
-static grpc_error *prepare_socket(int fd, const grpc_resolved_address *addr,
- bool so_reuseport, int *port) {
- grpc_resolved_address sockname_temp;
- grpc_error *err = GRPC_ERROR_NONE;
-
- GPR_ASSERT(fd >= 0);
-
- if (so_reuseport && !grpc_is_unix_socket(addr)) {
- err = grpc_set_socket_reuse_port(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- }
-
- err = grpc_set_socket_nonblocking(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- err = grpc_set_socket_cloexec(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- if (!grpc_is_unix_socket(addr)) {
- err = grpc_set_socket_low_latency(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- err = grpc_set_socket_reuse_addr(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- }
- err = grpc_set_socket_no_sigpipe_if_possible(fd);
- if (err != GRPC_ERROR_NONE) goto error;
-
- GPR_ASSERT(addr->len < ~(socklen_t)0);
- if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) {
- err = GRPC_OS_ERROR(errno, "bind");
- goto error;
- }
-
- if (listen(fd, get_max_accept_queue_size()) < 0) {
- err = GRPC_OS_ERROR(errno, "listen");
- goto error;
- }
-
- sockname_temp.len = sizeof(struct sockaddr_storage);
-
- if (getsockname(fd, (struct sockaddr *)sockname_temp.addr,
- (socklen_t *)&sockname_temp.len) < 0) {
- err = GRPC_OS_ERROR(errno, "getsockname");
- goto error;
- }
-
- *port = grpc_sockaddr_get_port(&sockname_temp);
- return GRPC_ERROR_NONE;
-
-error:
- GPR_ASSERT(err != GRPC_ERROR_NONE);
- if (fd >= 0) {
- close(fd);
- }
- grpc_error *ret = grpc_error_set_int(
- GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1),
- GRPC_ERROR_INT_FD, fd);
- GRPC_ERROR_UNREF(err);
- return ret;
-}
-
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
@@ -422,7 +256,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
return;
default:
- gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
+ gpr_mu_lock(&sp->server->mu);
+ if (!sp->server->shutdown_listeners) {
+ gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
+ } else {
+ /* if we have shutdown listeners, accept4 could fail, and we
+ needn't notify users */
+ }
+ gpr_mu_unlock(&sp->server->mu);
goto error;
}
}
@@ -438,11 +279,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_fd *fdobj = grpc_fd_create(fd, name);
- if (read_notifier_pollset == NULL) {
- gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
- goto error;
- }
-
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
// Create acceptor.
@@ -473,216 +309,6 @@ error:
}
}
-static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
- const grpc_resolved_address *addr,
- unsigned port_index, unsigned fd_index,
- grpc_tcp_listener **listener) {
- grpc_tcp_listener *sp = NULL;
- int port = -1;
- char *addr_str;
- char *name;
-
- grpc_error *err = prepare_socket(fd, addr, s->so_reuseport, &port);
- if (err == GRPC_ERROR_NONE) {
- GPR_ASSERT(port > 0);
- grpc_sockaddr_to_string(&addr_str, addr, 1);
- gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
- gpr_mu_lock(&s->mu);
- s->nports++;
- GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
- sp = gpr_malloc(sizeof(grpc_tcp_listener));
- sp->next = NULL;
- if (s->head == NULL) {
- s->head = sp;
- } else {
- s->tail->next = sp;
- }
- s->tail = sp;
- sp->server = s;
- sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
- memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
- sp->port = port;
- sp->port_index = port_index;
- sp->fd_index = fd_index;
- sp->is_sibling = 0;
- sp->sibling = NULL;
- GPR_ASSERT(sp->emfd);
- gpr_mu_unlock(&s->mu);
- gpr_free(addr_str);
- gpr_free(name);
- }
-
- *listener = sp;
- return err;
-}
-
-/* If successful, add a listener to s for addr, set *dsmode for the socket, and
- return the *listener. */
-static grpc_error *add_addr_to_server(grpc_tcp_server *s,
- const grpc_resolved_address *addr,
- unsigned port_index, unsigned fd_index,
- grpc_dualstack_mode *dsmode,
- grpc_tcp_listener **listener) {
- grpc_resolved_address addr4_copy;
- int fd;
- grpc_error *err =
- grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd);
- if (err != GRPC_ERROR_NONE) {
- return err;
- }
- if (*dsmode == GRPC_DSMODE_IPV4 &&
- grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
- addr = &addr4_copy;
- }
- return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
-}
-
-/* Bind to "::" to get a port number not used by any address. */
-static grpc_error *get_unused_port(int *port) {
- grpc_resolved_address wild;
- grpc_sockaddr_make_wildcard6(0, &wild);
- grpc_dualstack_mode dsmode;
- int fd;
- grpc_error *err =
- grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd);
- if (err != GRPC_ERROR_NONE) {
- return err;
- }
- if (dsmode == GRPC_DSMODE_IPV4) {
- grpc_sockaddr_make_wildcard4(0, &wild);
- }
- if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) {
- err = GRPC_OS_ERROR(errno, "bind");
- close(fd);
- return err;
- }
- if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) !=
- 0) {
- err = GRPC_OS_ERROR(errno, "getsockname");
- close(fd);
- return err;
- }
- close(fd);
- *port = grpc_sockaddr_get_port(&wild);
- return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE;
-}
-
-/* Return the listener in s with address addr or NULL. */
-static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s,
- grpc_resolved_address *addr) {
- grpc_tcp_listener *l;
- gpr_mu_lock(&s->mu);
- for (l = s->head; l != NULL; l = l->next) {
- if (l->addr.len != addr->len) {
- continue;
- }
- if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) {
- break;
- }
- }
- gpr_mu_unlock(&s->mu);
- return l;
-}
-
-/* Get all addresses assigned to network interfaces on the machine and create a
- listener for each. requested_port is the port to use for every listener, or 0
- to select one random port that will be used for every listener. Set *out_port
- to the port selected. Return GRPC_ERROR_NONE only if all listeners were
- added. */
-static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s,
- unsigned port_index,
- int requested_port,
- int *out_port) {
- struct ifaddrs *ifa = NULL;
- struct ifaddrs *ifa_it;
- unsigned fd_index = 0;
- grpc_tcp_listener *sp = NULL;
- grpc_error *err = GRPC_ERROR_NONE;
- if (requested_port == 0) {
- /* Note: There could be a race where some local addrs can listen on the
- selected port and some can't. The sane way to handle this would be to
- retry by recreating the whole grpc_tcp_server. Backing out individual
- listeners and orphaning the FDs looks like too much trouble. */
- if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) {
- return err;
- } else if (requested_port <= 0) {
- return GRPC_ERROR_CREATE("Bad get_unused_port()");
- }
- gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port);
- }
- if (getifaddrs(&ifa) != 0 || ifa == NULL) {
- return GRPC_OS_ERROR(errno, "getifaddrs");
- }
- for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) {
- grpc_resolved_address addr;
- char *addr_str = NULL;
- grpc_dualstack_mode dsmode;
- grpc_tcp_listener *new_sp = NULL;
- const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>");
- if (ifa_it->ifa_addr == NULL) {
- continue;
- } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
- addr.len = sizeof(struct sockaddr_in);
- } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
- addr.len = sizeof(struct sockaddr_in6);
- } else {
- continue;
- }
- memcpy(addr.addr, ifa_it->ifa_addr, addr.len);
- if (!grpc_sockaddr_set_port(&addr, requested_port)) {
- /* Should never happen, because we check sa_family above. */
- err = GRPC_ERROR_CREATE("Failed to set port");
- break;
- }
- if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) {
- addr_str = gpr_strdup("<error>");
- }
- gpr_log(GPR_DEBUG,
- "Adding local addr from interface %s flags 0x%x to server: %s",
- ifa_name, ifa_it->ifa_flags, addr_str);
- /* We could have multiple interfaces with the same address (e.g., bonding),
- so look for duplicates. */
- if (find_listener_with_addr(s, &addr) != NULL) {
- gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str,
- ifa_name);
- gpr_free(addr_str);
- continue;
- }
- if ((err = add_addr_to_server(s, &addr, port_index, fd_index, &dsmode,
- &new_sp)) != GRPC_ERROR_NONE) {
- char *err_str = NULL;
- grpc_error *root_err;
- if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) {
- err_str = gpr_strdup("Failed to add listener");
- }
- root_err = GRPC_ERROR_CREATE(err_str);
- gpr_free(err_str);
- gpr_free(addr_str);
- err = grpc_error_add_child(root_err, err);
- break;
- } else {
- GPR_ASSERT(requested_port == new_sp->port);
- ++fd_index;
- if (sp != NULL) {
- new_sp->is_sibling = 1;
- sp->sibling = new_sp;
- }
- sp = new_sp;
- }
- gpr_free(addr_str);
- }
- freeifaddrs(ifa);
- if (err != GRPC_ERROR_NONE) {
- return err;
- } else if (sp == NULL) {
- return GRPC_ERROR_CREATE("No local addresses");
- } else {
- *out_port = sp->port;
- return GRPC_ERROR_NONE;
- }
-}
-
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s,
unsigned port_index,
@@ -697,14 +323,16 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s,
grpc_error *v6_err = GRPC_ERROR_NONE;
grpc_error *v4_err = GRPC_ERROR_NONE;
*out_port = -1;
- if (s->expand_wildcard_addrs) {
- return add_all_local_addrs_to_server(s, port_index, requested_port,
- out_port);
+
+ if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
+ return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
+ out_port);
}
+
grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
/* Try listening on IPv6 first. */
- if ((v6_err = add_addr_to_server(s, &wild6, port_index, fd_index, &dsmode,
- &sp)) == GRPC_ERROR_NONE) {
+ if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
+ &dsmode, &sp)) == GRPC_ERROR_NONE) {
++fd_index;
requested_port = *out_port = sp->port;
if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
@@ -713,8 +341,8 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s,
}
/* If we got a v6-only socket or nothing, try adding 0.0.0.0. */
grpc_sockaddr_set_port(&wild4, requested_port);
- if ((v4_err = add_addr_to_server(s, &wild4, port_index, fd_index, &dsmode,
- &sp2)) == GRPC_ERROR_NONE) {
+ if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
+ &dsmode, &sp2)) == GRPC_ERROR_NONE) {
*out_port = sp2->port;
if (sp != NULL) {
sp2->is_sibling = 1;
@@ -752,7 +380,7 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
&fd);
if (err != GRPC_ERROR_NONE) return err;
- err = prepare_socket(fd, &listener->addr, true, &port);
+ err = grpc_tcp_server_prepare_socket(fd, &listener->addr, true, &port);
if (err != GRPC_ERROR_NONE) return err;
listener->server->nports++;
grpc_sockaddr_to_string(&addr_str, &listener->addr, 1);
@@ -824,7 +452,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = &addr6_v4mapped;
}
- if ((err = add_addr_to_server(s, addr, port_index, 0, &dsmode, &sp)) ==
+ if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
GRPC_ERROR_NONE) {
*out_port = sp->port;
}
@@ -941,6 +569,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
+ s->shutdown_listeners = true;
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener *sp;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h
new file mode 100644
index 0000000000..f5dc8532f9
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_utils_posix.h
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H
+
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/socket_utils_posix.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+
+/* one listening port */
+typedef struct grpc_tcp_listener {
+ int fd;
+ grpc_fd *emfd;
+ grpc_tcp_server *server;
+ grpc_resolved_address addr;
+ int port;
+ unsigned port_index;
+ unsigned fd_index;
+ grpc_closure read_closure;
+ grpc_closure destroyed_closure;
+ struct grpc_tcp_listener *next;
+ /* sibling is a linked list of all listeners for a given port. add_port and
+ clone_port place all new listeners in the same sibling list. A member of
+ the 'sibling' list is also a member of the 'next' list. The head of each
+ sibling list has is_sibling==0, and subsequent members of sibling lists
+ have is_sibling==1. is_sibling allows separate sibling lists to be
+ identified while iterating through 'next'. */
+ struct grpc_tcp_listener *sibling;
+ int is_sibling;
+} grpc_tcp_listener;
+
+/* the overall server */
+struct grpc_tcp_server {
+ gpr_refcount refs;
+ /* Called whenever accept() succeeds on a server port. */
+ grpc_tcp_server_cb on_accept_cb;
+ void *on_accept_cb_arg;
+
+ gpr_mu mu;
+
+ /* active port count: how many ports are actually still listening */
+ size_t active_ports;
+ /* destroyed port count: how many ports are completely destroyed */
+ size_t destroyed_ports;
+
+ /* is this server shutting down? */
+ bool shutdown;
+ /* have listeners been shutdown? */
+ bool shutdown_listeners;
+ /* use SO_REUSEPORT */
+ bool so_reuseport;
+ /* expand wildcard addresses to a list of all local addresses */
+ bool expand_wildcard_addrs;
+
+ /* linked list of server ports */
+ grpc_tcp_listener *head;
+ grpc_tcp_listener *tail;
+ unsigned nports;
+
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
+
+ /* shutdown callback */
+ grpc_closure *shutdown_complete;
+
+ /* all pollsets interested in new connections */
+ grpc_pollset **pollsets;
+ /* number of pollsets in the pollsets array */
+ size_t pollset_count;
+
+ /* next pollset to assign a channel to */
+ gpr_atm next_pollset_to_assign;
+
+ grpc_resource_quota *resource_quota;
+};
+
+/* If successful, add a listener to \a s for \a addr, set \a dsmode for the
+ socket, and return the \a listener. */
+grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s,
+ const grpc_resolved_address *addr,
+ unsigned port_index, unsigned fd_index,
+ grpc_dualstack_mode *dsmode,
+ grpc_tcp_listener **listener);
+
+/* Get all addresses assigned to network interfaces on the machine and create a
+ listener for each. requested_port is the port to use for every listener, or 0
+ to select one random port that will be used for every listener. Set *out_port
+ to the port selected. Return GRPC_ERROR_NONE only if all listeners were
+ added. */
+grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s,
+ unsigned port_index,
+ int requested_port,
+ int *out_port);
+
+/* Prepare a recently-created socket for listening. */
+grpc_error *grpc_tcp_server_prepare_socket(int fd,
+ const grpc_resolved_address *addr,
+ bool so_reuseport, int *port);
+/* Ruturn true if the platform supports ifaddrs */
+bool grpc_tcp_server_have_ifaddrs(void);
+
+#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.c
new file mode 100644
index 0000000000..e45e27d5ab
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.c
@@ -0,0 +1,220 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_HAVE_IFADDRS
+
+#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
+
+#include <errno.h>
+#include <limits.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/unix_sockets_posix.h"
+
+#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
+
+static gpr_once s_init_max_accept_queue_size;
+static int s_max_accept_queue_size;
+
+/* get max listen queue size on linux */
+static void init_max_accept_queue_size(void) {
+ int n = SOMAXCONN;
+ char buf[64];
+ FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r");
+ if (fp == NULL) {
+ /* 2.4 kernel. */
+ s_max_accept_queue_size = SOMAXCONN;
+ return;
+ }
+ if (fgets(buf, sizeof buf, fp)) {
+ char *end;
+ long i = strtol(buf, &end, 10);
+ if (i > 0 && i <= INT_MAX && end && *end == 0) {
+ n = (int)i;
+ }
+ }
+ fclose(fp);
+ s_max_accept_queue_size = n;
+
+ if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
+ gpr_log(GPR_INFO,
+ "Suspiciously small accept queue (%d) will probably lead to "
+ "connection drops",
+ s_max_accept_queue_size);
+ }
+}
+
+static int get_max_accept_queue_size(void) {
+ gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
+ return s_max_accept_queue_size;
+}
+
+static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
+ const grpc_resolved_address *addr,
+ unsigned port_index, unsigned fd_index,
+ grpc_tcp_listener **listener) {
+ grpc_tcp_listener *sp = NULL;
+ int port = -1;
+ char *addr_str;
+ char *name;
+
+ grpc_error *err =
+ grpc_tcp_server_prepare_socket(fd, addr, s->so_reuseport, &port);
+ if (err == GRPC_ERROR_NONE) {
+ GPR_ASSERT(port > 0);
+ grpc_sockaddr_to_string(&addr_str, addr, 1);
+ gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
+ gpr_mu_lock(&s->mu);
+ s->nports++;
+ GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
+ sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
+ sp->server = s;
+ sp->fd = fd;
+ sp->emfd = grpc_fd_create(fd, name);
+ memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
+ sp->port = port;
+ sp->port_index = port_index;
+ sp->fd_index = fd_index;
+ sp->is_sibling = 0;
+ sp->sibling = NULL;
+ GPR_ASSERT(sp->emfd);
+ gpr_mu_unlock(&s->mu);
+ gpr_free(addr_str);
+ gpr_free(name);
+ }
+
+ *listener = sp;
+ return err;
+}
+
+/* If successful, add a listener to s for addr, set *dsmode for the socket, and
+ return the *listener. */
+grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s,
+ const grpc_resolved_address *addr,
+ unsigned port_index, unsigned fd_index,
+ grpc_dualstack_mode *dsmode,
+ grpc_tcp_listener **listener) {
+ grpc_resolved_address addr4_copy;
+ int fd;
+ grpc_error *err =
+ grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
+ }
+ if (*dsmode == GRPC_DSMODE_IPV4 &&
+ grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
+ addr = &addr4_copy;
+ }
+ return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
+}
+
+/* Prepare a recently-created socket for listening. */
+grpc_error *grpc_tcp_server_prepare_socket(int fd,
+ const grpc_resolved_address *addr,
+ bool so_reuseport, int *port) {
+ grpc_resolved_address sockname_temp;
+ grpc_error *err = GRPC_ERROR_NONE;
+
+ GPR_ASSERT(fd >= 0);
+
+ if (so_reuseport && !grpc_is_unix_socket(addr)) {
+ err = grpc_set_socket_reuse_port(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+
+ err = grpc_set_socket_nonblocking(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_cloexec(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ if (!grpc_is_unix_socket(addr)) {
+ err = grpc_set_socket_low_latency(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_reuse_addr(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+ err = grpc_set_socket_no_sigpipe_if_possible(fd);
+ if (err != GRPC_ERROR_NONE) goto error;
+
+ GPR_ASSERT(addr->len < ~(socklen_t)0);
+ if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) {
+ err = GRPC_OS_ERROR(errno, "bind");
+ goto error;
+ }
+
+ if (listen(fd, get_max_accept_queue_size()) < 0) {
+ err = GRPC_OS_ERROR(errno, "listen");
+ goto error;
+ }
+
+ sockname_temp.len = sizeof(struct sockaddr_storage);
+
+ if (getsockname(fd, (struct sockaddr *)sockname_temp.addr,
+ (socklen_t *)&sockname_temp.len) < 0) {
+ err = GRPC_OS_ERROR(errno, "getsockname");
+ goto error;
+ }
+
+ *port = grpc_sockaddr_get_port(&sockname_temp);
+ return GRPC_ERROR_NONE;
+
+error:
+ GPR_ASSERT(err != GRPC_ERROR_NONE);
+ if (fd >= 0) {
+ close(fd);
+ }
+ grpc_error *ret = grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1),
+ GRPC_ERROR_INT_FD, fd);
+ GRPC_ERROR_UNREF(err);
+ return ret;
+}
+
+#endif /* GRPC_HAVE_IFADDRS */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c
new file mode 100644
index 0000000000..6354a6bdc1
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c
@@ -0,0 +1,195 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_HAVE_IFADDRS
+
+#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
+
+#include <errno.h>
+#include <ifaddrs.h>
+#include <stddef.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+
+/* Return the listener in s with address addr or NULL. */
+static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s,
+ grpc_resolved_address *addr) {
+ grpc_tcp_listener *l;
+ gpr_mu_lock(&s->mu);
+ for (l = s->head; l != NULL; l = l->next) {
+ if (l->addr.len != addr->len) {
+ continue;
+ }
+ if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) {
+ break;
+ }
+ }
+ gpr_mu_unlock(&s->mu);
+ return l;
+}
+
+/* Bind to "::" to get a port number not used by any address. */
+static grpc_error *get_unused_port(int *port) {
+ grpc_resolved_address wild;
+ grpc_sockaddr_make_wildcard6(0, &wild);
+ grpc_dualstack_mode dsmode;
+ int fd;
+ grpc_error *err =
+ grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
+ }
+ if (dsmode == GRPC_DSMODE_IPV4) {
+ grpc_sockaddr_make_wildcard4(0, &wild);
+ }
+ if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) {
+ err = GRPC_OS_ERROR(errno, "bind");
+ close(fd);
+ return err;
+ }
+ if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) !=
+ 0) {
+ err = GRPC_OS_ERROR(errno, "getsockname");
+ close(fd);
+ return err;
+ }
+ close(fd);
+ *port = grpc_sockaddr_get_port(&wild);
+ return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE;
+}
+
+grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s,
+ unsigned port_index,
+ int requested_port,
+ int *out_port) {
+ struct ifaddrs *ifa = NULL;
+ struct ifaddrs *ifa_it;
+ unsigned fd_index = 0;
+ grpc_tcp_listener *sp = NULL;
+ grpc_error *err = GRPC_ERROR_NONE;
+ if (requested_port == 0) {
+ /* Note: There could be a race where some local addrs can listen on the
+ selected port and some can't. The sane way to handle this would be to
+ retry by recreating the whole grpc_tcp_server. Backing out individual
+ listeners and orphaning the FDs looks like too much trouble. */
+ if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) {
+ return err;
+ } else if (requested_port <= 0) {
+ return GRPC_ERROR_CREATE("Bad get_unused_port()");
+ }
+ gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port);
+ }
+ if (getifaddrs(&ifa) != 0 || ifa == NULL) {
+ return GRPC_OS_ERROR(errno, "getifaddrs");
+ }
+ for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) {
+ grpc_resolved_address addr;
+ char *addr_str = NULL;
+ grpc_dualstack_mode dsmode;
+ grpc_tcp_listener *new_sp = NULL;
+ const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>");
+ if (ifa_it->ifa_addr == NULL) {
+ continue;
+ } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
+ addr.len = sizeof(struct sockaddr_in);
+ } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
+ addr.len = sizeof(struct sockaddr_in6);
+ } else {
+ continue;
+ }
+ memcpy(addr.addr, ifa_it->ifa_addr, addr.len);
+ if (!grpc_sockaddr_set_port(&addr, requested_port)) {
+ /* Should never happen, because we check sa_family above. */
+ err = GRPC_ERROR_CREATE("Failed to set port");
+ break;
+ }
+ if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) {
+ addr_str = gpr_strdup("<error>");
+ }
+ gpr_log(GPR_DEBUG,
+ "Adding local addr from interface %s flags 0x%x to server: %s",
+ ifa_name, ifa_it->ifa_flags, addr_str);
+ /* We could have multiple interfaces with the same address (e.g., bonding),
+ so look for duplicates. */
+ if (find_listener_with_addr(s, &addr) != NULL) {
+ gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str,
+ ifa_name);
+ gpr_free(addr_str);
+ continue;
+ }
+ if ((err = grpc_tcp_server_add_addr(s, &addr, port_index, fd_index, &dsmode,
+ &new_sp)) != GRPC_ERROR_NONE) {
+ char *err_str = NULL;
+ grpc_error *root_err;
+ if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) {
+ err_str = gpr_strdup("Failed to add listener");
+ }
+ root_err = GRPC_ERROR_CREATE(err_str);
+ gpr_free(err_str);
+ gpr_free(addr_str);
+ err = grpc_error_add_child(root_err, err);
+ break;
+ } else {
+ GPR_ASSERT(requested_port == new_sp->port);
+ ++fd_index;
+ if (sp != NULL) {
+ new_sp->is_sibling = 1;
+ sp->sibling = new_sp;
+ }
+ sp = new_sp;
+ }
+ gpr_free(addr_str);
+ }
+ freeifaddrs(ifa);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
+ } else if (sp == NULL) {
+ return GRPC_ERROR_CREATE("No local addresses");
+ } else {
+ *out_port = sp->port;
+ return GRPC_ERROR_NONE;
+ }
+}
+
+bool grpc_tcp_server_have_ifaddrs(void) { return true; }
+
+#endif /* GRPC_HAVE_IFADDRS */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c
new file mode 100644
index 0000000000..95c3198be6
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#if defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS)
+
+#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
+
+grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s,
+ unsigned port_index,
+ int requested_port,
+ int *out_port) {
+ return GRPC_ERROR_CREATE("no ifaddrs available");
+}
+
+bool grpc_tcp_server_have_ifaddrs(void) { return false; }
+
+#endif /* defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS) */
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 6d638bcbaa..d4df96c214 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -42,6 +42,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/time_averaged_stats.h"
#include "src/core/lib/iomgr/timer_heap.h"
+#include "src/core/lib/support/spinlock.h"
#define INVALID_HEAP_INDEX 0xffffffffu
@@ -69,7 +70,7 @@ typedef struct {
/* Protects g_shard_queue */
static gpr_mu g_mu;
/* Allow only one run_some_expired_timers at once */
-static gpr_mu g_checker_mu;
+static gpr_spinlock g_checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER;
static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
@@ -90,7 +91,6 @@ void grpc_timer_list_init(gpr_timespec now) {
g_initialized = true;
gpr_mu_init(&g_mu);
- gpr_mu_init(&g_checker_mu);
g_clock_type = now.clock_type;
for (i = 0; i < NUM_SHARDS; i++) {
@@ -117,7 +117,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
grpc_timer_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_mu);
- gpr_mu_destroy(&g_checker_mu);
g_initialized = false;
}
@@ -324,7 +323,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
/* TODO(ctiller): verify that there are any timers (atomically) here */
- if (gpr_mu_trylock(&g_checker_mu)) {
+ if (gpr_spinlock_trylock(&g_checker_mu)) {
gpr_mu_lock(&g_mu);
while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
@@ -350,7 +349,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
}
gpr_mu_unlock(&g_mu);
- gpr_mu_unlock(&g_checker_mu);
+ gpr_spinlock_unlock(&g_checker_mu);
} else if (next != NULL) {
/* TODO(ctiller): this forces calling code to do an short poll, and
then retry the timer check (because this time through the timer list was
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 2a1c8d39fa..71e295770a 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -109,8 +109,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
- /* The parent grpc server */
- grpc_server *grpc_server;
+ /* opaque object to pass to callbacks */
+ void *user_data;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -178,7 +178,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. */
GPR_ASSERT(sp->orphan_cb);
- sp->orphan_cb(exec_ctx, sp->emfd);
+ sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
@@ -204,7 +204,7 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
- sp->orphan_cb(exec_ctx, sp->emfd);
+ sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE("Server destroyed"));
}
@@ -299,7 +299,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb);
- sp->read_cb(exec_ctx, sp->emfd, sp->server->grpc_server);
+ sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data);
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
@@ -322,7 +322,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* Tell the registered callback that the socket is writeable. */
GPR_ASSERT(sp->write_cb);
- sp->write_cb(exec_ctx, sp->emfd);
+ sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data);
/* Re-arm the notification event so we get another chance to write. */
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
@@ -464,13 +464,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_pollset **pollsets, size_t pollset_count,
- grpc_server *server) {
+ void *user_data) {
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener *sp;
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
- s->grpc_server = server;
+ s->user_data = user_data;
sp = s->head;
while (sp != NULL) {
@@ -485,7 +485,11 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
- s->active_ports++;
+ /* Registered for both read and write callbacks: increment active_ports
+ * twice to account for this, and delay free-ing of memory until both
+ * on_read and on_write have fired. */
+ s->active_ports += 2;
+
sp = sp->next;
}
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index ed63fa7d81..90842a47f0 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -47,23 +47,23 @@ typedef struct grpc_udp_server grpc_udp_server;
/* Called when data is available to read from the socket. */
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
- struct grpc_server *server);
+ void *user_data);
/* Called when the socket is writeable. */
-typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx,
- grpc_fd *emfd);
+typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
+ void *user_data);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx,
- grpc_fd *emfd);
+ grpc_fd *emfd, void *user_data);
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
-/* Start listening to bound ports */
+/* Start listening to bound ports. user_data is passed to callbacks. */
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server,
grpc_pollset **pollsets, size_t pollset_count,
- struct grpc_server *server);
+ void *user_data);
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index);
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h
index 71d32d97ba..c8dd242c75 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.h
+++ b/src/core/lib/iomgr/wakeup_fd_posix.h
@@ -46,7 +46,7 @@
*
* Setup:
* 1. Before calling anything, call global_init() at least once.
- * 1. Call grpc_wakeup_fd_create() to get a wakeup_fd.
+ * 1. Call grpc_wakeup_fd_init() to set up a wakeup_fd.
* 2. Add the result of GRPC_WAKEUP_FD_FD to the set of monitored file
* descriptors for the poll() style API you are using. Monitor the file
* descriptor for readability.