aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-24 13:33:06 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-24 13:33:06 -0700
commit6f2024f4eca994ba18f47502d1cebf5d4616256d (patch)
tree100a545063425d271834136a488576b5dc989442 /src/core/lib
parentdecec093169c93a8e6d033ff9971b0999a2766a3 (diff)
parent731adcc0b4b06de05444bfbd65a3a1d9aaa3ecaf (diff)
Merge github.com:grpc/grpc into c++lame
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/slice/slice_hash_table.c68
-rw-r--r--src/core/lib/slice/slice_hash_table.h19
-rw-r--r--src/core/lib/support/stack_lockfree.c6
-rw-r--r--src/core/lib/support/time_posix.c8
-rw-r--r--src/core/lib/support/tmpfile_posix.c20
-rw-r--r--src/core/lib/surface/call.c1
-rw-r--r--src/core/lib/surface/completion_queue.c186
-rw-r--r--src/core/lib/surface/completion_queue.h6
-rw-r--r--src/core/lib/surface/server.c21
-rw-r--r--src/core/lib/transport/service_config.c19
-rw-r--r--src/core/lib/transport/service_config.h4
11 files changed, 241 insertions, 117 deletions
diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c
index 219567f36f..444f22aa19 100644
--- a/src/core/lib/slice/slice_hash_table.c
+++ b/src/core/lib/slice/slice_hash_table.c
@@ -42,56 +42,47 @@
struct grpc_slice_hash_table {
gpr_refcount refs;
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value);
size_t size;
+ size_t max_num_probes;
grpc_slice_hash_table_entry* entries;
};
static bool is_empty(grpc_slice_hash_table_entry* entry) {
- return entry->vtable == NULL;
+ return entry->value == NULL;
}
-// Helper function for insert and get operations that performs quadratic
-// probing (https://en.wikipedia.org/wiki/Quadratic_probing).
-static size_t grpc_slice_hash_table_find_index(
- const grpc_slice_hash_table* table, const grpc_slice key, bool find_empty) {
- size_t hash = grpc_slice_hash(key);
- for (size_t i = 0; i < table->size; ++i) {
- const size_t idx = (hash + i * i) % table->size;
+static void grpc_slice_hash_table_add(grpc_slice_hash_table* table,
+ grpc_slice key, void* value) {
+ GPR_ASSERT(value != NULL);
+ const size_t hash = grpc_slice_hash(key);
+ for (size_t offset = 0; offset < table->size; ++offset) {
+ const size_t idx = (hash + offset) % table->size;
if (is_empty(&table->entries[idx])) {
- return find_empty ? idx : table->size;
- }
- if (grpc_slice_eq(table->entries[idx].key, key)) {
- return idx;
+ table->entries[idx].key = key;
+ table->entries[idx].value = value;
+ // Keep track of the maximum number of probes needed, since this
+ // provides an upper bound for lookups.
+ if (offset > table->max_num_probes) table->max_num_probes = offset;
+ return;
}
}
- return table->size; // Not found.
-}
-
-static void grpc_slice_hash_table_add(
- grpc_slice_hash_table* table, grpc_slice key, void* value,
- const grpc_slice_hash_table_vtable* vtable) {
- GPR_ASSERT(value != NULL);
- const size_t idx =
- grpc_slice_hash_table_find_index(table, key, true /* find_empty */);
- GPR_ASSERT(idx != table->size); // Table should never be full.
- grpc_slice_hash_table_entry* entry = &table->entries[idx];
- entry->key = grpc_slice_ref_internal(key);
- entry->value = vtable->copy_value(value);
- entry->vtable = vtable;
+ GPR_ASSERT(false); // Table should never be full.
}
grpc_slice_hash_table* grpc_slice_hash_table_create(
- size_t num_entries, grpc_slice_hash_table_entry* entries) {
+ size_t num_entries, grpc_slice_hash_table_entry* entries,
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) {
grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table));
gpr_ref_init(&table->refs, 1);
- // Quadratic probing gets best performance when the table is no more
- // than half full.
+ table->destroy_value = destroy_value;
+ // Keep load factor low to improve performance of lookups.
table->size = num_entries * 2;
const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size;
table->entries = gpr_zalloc(entry_size);
for (size_t i = 0; i < num_entries; ++i) {
grpc_slice_hash_table_entry* entry = &entries[i];
- grpc_slice_hash_table_add(table, entry->key, entry->value, entry->vtable);
+ grpc_slice_hash_table_add(table, entry->key, entry->value);
}
return table;
}
@@ -108,7 +99,7 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx* exec_ctx,
grpc_slice_hash_table_entry* entry = &table->entries[i];
if (!is_empty(entry)) {
grpc_slice_unref_internal(exec_ctx, entry->key);
- entry->vtable->destroy_value(exec_ctx, entry->value);
+ table->destroy_value(exec_ctx, entry->value);
}
}
gpr_free(table->entries);
@@ -118,8 +109,15 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx* exec_ctx,
void* grpc_slice_hash_table_get(const grpc_slice_hash_table* table,
const grpc_slice key) {
- const size_t idx =
- grpc_slice_hash_table_find_index(table, key, false /* find_empty */);
- if (idx == table->size) return NULL; // Not found.
- return table->entries[idx].value;
+ const size_t hash = grpc_slice_hash(key);
+ // We cap the number of probes at the max number recorded when
+ // populating the table.
+ for (size_t offset = 0; offset <= table->max_num_probes; ++offset) {
+ const size_t idx = (hash + offset) % table->size;
+ if (is_empty(&table->entries[idx])) break;
+ if (grpc_slice_eq(table->entries[idx].key, key)) {
+ return table->entries[idx].value;
+ }
+ }
+ return NULL; // Not found.
}
diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h
index d0c27122d7..1e61c5eb11 100644
--- a/src/core/lib/slice/slice_hash_table.h
+++ b/src/core/lib/slice/slice_hash_table.h
@@ -37,33 +37,28 @@
/** Hash table implementation.
*
* This implementation uses open addressing
- * (https://en.wikipedia.org/wiki/Open_addressing) with quadratic
- * probing (https://en.wikipedia.org/wiki/Quadratic_probing).
+ * (https://en.wikipedia.org/wiki/Open_addressing) with linear
+ * probing (https://en.wikipedia.org/wiki/Linear_probing).
*
* The keys are \a grpc_slice objects. The values are arbitrary pointers
- * with a common vtable.
+ * with a common destroy function.
*
* Hash tables are intentionally immutable, to avoid the need for locking.
*/
typedef struct grpc_slice_hash_table grpc_slice_hash_table;
-typedef struct grpc_slice_hash_table_vtable {
- void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value);
- void *(*copy_value)(void *value);
-} grpc_slice_hash_table_vtable;
-
typedef struct grpc_slice_hash_table_entry {
grpc_slice key;
void *value; /* Must not be NULL. */
- const grpc_slice_hash_table_vtable *vtable;
} grpc_slice_hash_table_entry;
/** Creates a new hash table of containing \a entries, which is an array
- of length \a num_entries.
- Creates its own copy of all keys and values from \a entries. */
+ of length \a num_entries. Takes ownership of all keys and values in
+ \a entries. Values will be cleaned up via \a destroy_value(). */
grpc_slice_hash_table *grpc_slice_hash_table_create(
- size_t num_entries, grpc_slice_hash_table_entry *entries);
+ size_t num_entries, grpc_slice_hash_table_entry *entries,
+ void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value));
grpc_slice_hash_table *grpc_slice_hash_table_ref(grpc_slice_hash_table *table);
void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c
index c481a3e0dc..dfbd3fb125 100644
--- a/src/core/lib/support/stack_lockfree.c
+++ b/src/core/lib/support/stack_lockfree.c
@@ -76,13 +76,13 @@ struct gpr_stack_lockfree {
gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) {
gpr_stack_lockfree *stack;
- stack = gpr_malloc(sizeof(*stack));
+ stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack));
/* Since we only allocate 16 bits to represent an entry number,
* make sure that we are within the desired range */
/* Reserve the highest entry number as a dummy */
GPR_ASSERT(entries < INVALID_ENTRY_INDEX);
- stack->entries = gpr_malloc_aligned(entries * sizeof(stack->entries[0]),
- ENTRY_ALIGNMENT_BITS);
+ stack->entries = (lockfree_node *)gpr_malloc_aligned(
+ entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS);
/* Clear out all entries */
memset(stack->entries, 0, entries * sizeof(stack->entries[0]));
memset(&stack->head, 0, sizeof(stack->head));
diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c
index a69c501e9f..9bfec7782a 100644
--- a/src/core/lib/support/time_posix.c
+++ b/src/core/lib/support/time_posix.c
@@ -42,6 +42,7 @@
#ifdef __linux__
#include <sys/syscall.h>
#endif
+#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/support/block_annotate.h"
@@ -144,7 +145,14 @@ static gpr_timespec now_impl(gpr_clock_type clock) {
gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type) = now_impl;
+#ifdef GPR_LOW_LEVEL_COUNTERS
+gpr_atm gpr_now_call_count;
+#endif
+
gpr_timespec gpr_now(gpr_clock_type clock_type) {
+#ifdef GPR_LOW_LEVEL_COUNTERS
+ __atomic_fetch_add(&gpr_now_call_count, 1, __ATOMIC_RELAXED);
+#endif
return gpr_now_impl(clock_type);
}
diff --git a/src/core/lib/support/tmpfile_posix.c b/src/core/lib/support/tmpfile_posix.c
index 0cd4bb6fc3..5771c158e0 100644
--- a/src/core/lib/support/tmpfile_posix.c
+++ b/src/core/lib/support/tmpfile_posix.c
@@ -50,34 +50,34 @@
FILE *gpr_tmpfile(const char *prefix, char **tmp_filename) {
FILE *result = NULL;
- char *template;
+ char *filename_template;
int fd;
if (tmp_filename != NULL) *tmp_filename = NULL;
- gpr_asprintf(&template, "/tmp/%s_XXXXXX", prefix);
- GPR_ASSERT(template != NULL);
+ gpr_asprintf(&filename_template, "/tmp/%s_XXXXXX", prefix);
+ GPR_ASSERT(filename_template != NULL);
- fd = mkstemp(template);
+ fd = mkstemp(filename_template);
if (fd == -1) {
- gpr_log(GPR_ERROR, "mkstemp failed for template %s with error %s.",
- template, strerror(errno));
+ gpr_log(GPR_ERROR, "mkstemp failed for filename_template %s with error %s.",
+ filename_template, strerror(errno));
goto end;
}
result = fdopen(fd, "w+");
if (result == NULL) {
gpr_log(GPR_ERROR, "Could not open file %s from fd %d (error = %s).",
- template, fd, strerror(errno));
- unlink(template);
+ filename_template, fd, strerror(errno));
+ unlink(filename_template);
close(fd);
goto end;
}
end:
if (result != NULL && tmp_filename != NULL) {
- *tmp_filename = template;
+ *tmp_filename = filename_template;
} else {
- gpr_free(template);
+ gpr_free(filename_template);
}
return result;
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 9aa457d792..7525806583 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -245,6 +245,7 @@ struct grpc_call {
};
int grpc_call_error_trace = 0;
+int grpc_compression_trace = 0;
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 35e9f7eb30..eae3f103b1 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -60,13 +60,154 @@ typedef struct {
void *tag;
} plucker;
+typedef struct {
+ bool can_get_pollset;
+ bool can_listen;
+ size_t (*size)(void);
+ void (*init)(grpc_pollset *pollset, gpr_mu **mu);
+ grpc_error *(*kick)(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker);
+ grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker, gpr_timespec now,
+ gpr_timespec deadline);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure);
+ void (*destroy)(grpc_pollset *pollset);
+} cq_poller_vtable;
+
+typedef struct non_polling_worker {
+ gpr_cv cv;
+ bool kicked;
+ struct non_polling_worker *next;
+ struct non_polling_worker *prev;
+} non_polling_worker;
+
+typedef struct {
+ gpr_mu mu;
+ non_polling_worker *root;
+ grpc_closure *shutdown;
+} non_polling_poller;
+
+static size_t non_polling_poller_size(void) {
+ return sizeof(non_polling_poller);
+}
+
+static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) {
+ non_polling_poller *npp = (non_polling_poller *)pollset;
+ gpr_mu_init(&npp->mu);
+ *mu = &npp->mu;
+}
+
+static void non_polling_poller_destroy(grpc_pollset *pollset) {
+ non_polling_poller *npp = (non_polling_poller *)pollset;
+ gpr_mu_destroy(&npp->mu);
+}
+
+static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_pollset_worker **worker,
+ gpr_timespec now,
+ gpr_timespec deadline) {
+ non_polling_poller *npp = (non_polling_poller *)pollset;
+ if (npp->shutdown) return GRPC_ERROR_NONE;
+ non_polling_worker w;
+ gpr_cv_init(&w.cv);
+ if (worker != NULL) *worker = (grpc_pollset_worker *)&w;
+ if (npp->root == NULL) {
+ npp->root = w.next = w.prev = &w;
+ } else {
+ w.next = npp->root;
+ w.prev = w.next->prev;
+ w.next->prev = w.prev->next = &w;
+ }
+ w.kicked = false;
+ while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
+ ;
+ if (&w == npp->root) {
+ npp->root = w.next;
+ if (&w == npp->root) {
+ if (npp->shutdown) {
+ grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
+ }
+ npp->root = NULL;
+ }
+ }
+ w.next->prev = w.prev;
+ w.prev->next = w.next;
+ gpr_cv_destroy(&w.cv);
+ if (worker != NULL) *worker = NULL;
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error *non_polling_poller_kick(
+ grpc_pollset *pollset, grpc_pollset_worker *specific_worker) {
+ non_polling_poller *p = (non_polling_poller *)pollset;
+ if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
+ if (specific_worker != NULL) {
+ non_polling_worker *w = (non_polling_worker *)specific_worker;
+ if (!w->kicked) {
+ w->kicked = true;
+ gpr_cv_signal(&w->cv);
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_closure *closure) {
+ non_polling_poller *p = (non_polling_poller *)pollset;
+ GPR_ASSERT(closure != NULL);
+ p->shutdown = closure;
+ if (p->root == NULL) {
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ } else {
+ non_polling_worker *w = p->root;
+ do {
+ gpr_cv_signal(&w->cv);
+ w = w->next;
+ } while (w != p->root);
+ }
+}
+
+static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
+ /* GRPC_CQ_DEFAULT_POLLING */
+ {.can_get_pollset = true,
+ .can_listen = true,
+ .size = grpc_pollset_size,
+ .init = grpc_pollset_init,
+ .kick = grpc_pollset_kick,
+ .work = grpc_pollset_work,
+ .shutdown = grpc_pollset_shutdown,
+ .destroy = grpc_pollset_destroy},
+ /* GRPC_CQ_NON_LISTENING */
+ {.can_get_pollset = true,
+ .can_listen = false,
+ .size = grpc_pollset_size,
+ .init = grpc_pollset_init,
+ .kick = grpc_pollset_kick,
+ .work = grpc_pollset_work,
+ .shutdown = grpc_pollset_shutdown,
+ .destroy = grpc_pollset_destroy},
+ /* GRPC_CQ_NON_POLLING */
+ {.can_get_pollset = false,
+ .can_listen = false,
+ .size = non_polling_poller_size,
+ .init = non_polling_poller_init,
+ .kick = non_polling_poller_kick,
+ .work = non_polling_poller_work,
+ .shutdown = non_polling_poller_shutdown,
+ .destroy = non_polling_poller_destroy},
+};
+
/* Completion queue structure */
struct grpc_completion_queue {
/** owned by pollset */
gpr_mu *mu;
grpc_cq_completion_type completion_type;
- grpc_cq_polling_type polling_type;
+
+ const cq_poller_vtable *poller_vtable;
/** completed events */
grpc_cq_completion completed_head;
@@ -127,15 +268,18 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
"polling_type=%d)",
2, (completion_type, polling_type));
- cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
- grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
+ const cq_poller_vtable *poller_vtable =
+ &g_poller_vtable_by_poller_type[polling_type];
+
+ cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
+ poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
#endif
cc->completion_type = completion_type;
- cc->polling_type = polling_type;
+ cc->poller_vtable = poller_vtable;
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
@@ -164,10 +308,6 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
return cc->completion_type;
}
-grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) {
- return cc->polling_type;
-}
-
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@@ -195,7 +335,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
- grpc_pollset_destroy(POLLSET_FROM_CQ(cc));
+ cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc));
#ifndef NDEBUG
gpr_free(cc->outstanding_tags);
#endif
@@ -280,7 +420,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
}
}
grpc_error *kick_error =
- grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
+ cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
gpr_mu_unlock(cc->mu);
if (kick_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(kick_error);
@@ -295,8 +435,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
- grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
- &cc->pollset_shutdown_done);
+ cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
gpr_mu_unlock(cc->mu);
}
@@ -452,8 +592,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu);
continue;
} else {
- grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL,
- now, iteration_deadline);
+ grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ NULL, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err);
@@ -644,8 +784,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(cc->mu);
} else {
- grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
- &worker, now, iteration_deadline);
+ grpc_error *err = cc->poller_vtable->work(
+ &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);
@@ -689,8 +829,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->pending_events)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
- &cc->pollset_shutdown_done);
+ cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
}
gpr_mu_unlock(cc->mu);
grpc_exec_ctx_finish(&exec_ctx);
@@ -706,7 +846,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return POLLSET_FROM_CQ(cc);
+ return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
}
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
@@ -727,4 +867,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
-int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
+bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
+ return cc->is_server_cq;
+}
+
+bool grpc_cq_can_listen(grpc_completion_queue *cc) {
+ return cc->poller_vtable->can_listen;
+}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 1ff3d64293..a932087939 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -94,13 +94,11 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps);
-void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
-bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
-int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+bool grpc_cq_is_server_cq(grpc_completion_queue *cc);
+bool grpc_cq_can_listen(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
-grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc);
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 26c81e9aca..934ca0431a 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -981,7 +981,7 @@ const grpc_channel_filter grpc_server_top_filter = {
static void register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
- bool is_non_listening, void *reserved) {
+ void *reserved) {
size_t i, n;
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
@@ -990,10 +990,6 @@ static void register_completion_queue(grpc_server *server,
grpc_cq_mark_server_cq(cq);
- if (is_non_listening) {
- grpc_cq_mark_non_listening_server_cq(cq);
- }
-
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
@@ -1016,16 +1012,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
calls grpc_completion_queue_pluck() on server completion queues */
}
- register_completion_queue(server, cq, false, reserved);
-}
-
-void grpc_server_register_non_listening_completion_queue(
- grpc_server *server, grpc_completion_queue *cq, void *reserved) {
- GRPC_API_TRACE(
- "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
- "reserved=%p)",
- 3, (server, cq, reserved));
- register_completion_queue(server, cq, true, reserved);
+ register_completion_queue(server, cq, reserved);
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
@@ -1033,8 +1020,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
grpc_server *server = gpr_zalloc(sizeof(grpc_server));
- GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
-
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
gpr_cv_init(&server->starting_cv);
@@ -1123,7 +1108,7 @@ void grpc_server_start(grpc_server *server) {
server->requested_calls_per_cq =
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
- if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
+ if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c
index 1195f75044..6aecb7fa93 100644
--- a/src/core/lib/transport/service_config.c
+++ b/src/core/lib/transport/service_config.c
@@ -162,7 +162,6 @@ static char* parse_json_method_name(grpc_json* json) {
static bool parse_json_method_config(
grpc_exec_ctx* exec_ctx, grpc_json* json,
void* (*create_value)(const grpc_json* method_config_json),
- const grpc_slice_hash_table_vtable* vtable,
grpc_slice_hash_table_entry* entries, size_t* idx) {
// Construct value.
void* method_config = create_value(json);
@@ -185,13 +184,11 @@ static bool parse_json_method_config(
// Add entry for each path.
for (size_t i = 0; i < paths.count; ++i) {
entries[*idx].key = grpc_slice_from_copied_string(paths.strs[i]);
- entries[*idx].value = vtable->copy_value(method_config);
- entries[*idx].vtable = vtable;
+ entries[*idx].value = method_config;
++*idx;
}
success = true;
done:
- vtable->destroy_value(exec_ctx, method_config);
gpr_strvec_destroy(&paths);
return success;
}
@@ -199,7 +196,7 @@ done:
grpc_slice_hash_table* grpc_service_config_create_method_config_table(
grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config,
void* (*create_value)(const grpc_json* method_config_json),
- const grpc_slice_hash_table_vtable* vtable) {
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) {
const grpc_json* json = service_config->json_tree;
// Traverse parsed JSON tree.
if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL;
@@ -220,8 +217,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
size_t idx = 0;
for (grpc_json* method = field->child; method != NULL;
method = method->next) {
- if (!parse_json_method_config(exec_ctx, method, create_value, vtable,
- entries, &idx)) {
+ if (!parse_json_method_config(exec_ctx, method, create_value, entries,
+ &idx)) {
return NULL;
}
}
@@ -231,12 +228,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
// Instantiate method config table.
grpc_slice_hash_table* method_config_table = NULL;
if (entries != NULL) {
- method_config_table = grpc_slice_hash_table_create(num_entries, entries);
- // Clean up.
- for (size_t i = 0; i < num_entries; ++i) {
- grpc_slice_unref_internal(exec_ctx, entries[i].key);
- vtable->destroy_value(exec_ctx, entries[i].value);
- }
+ method_config_table =
+ grpc_slice_hash_table_create(num_entries, entries, destroy_value);
gpr_free(entries);
}
return method_config_table;
diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h
index ebfc59b534..e0548b9c3f 100644
--- a/src/core/lib/transport/service_config.h
+++ b/src/core/lib/transport/service_config.h
@@ -57,12 +57,12 @@ const char* grpc_service_config_get_lb_policy_name(
/// Creates a method config table based on the data in \a json.
/// The table's keys are request paths. The table's value type is
/// returned by \a create_value(), based on data parsed from the JSON tree.
-/// \a vtable provides methods used to manage the values.
+/// \a destroy_value is used to clean up values.
/// Returns NULL on error.
grpc_slice_hash_table* grpc_service_config_create_method_config_table(
grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config,
void* (*create_value)(const grpc_json* method_config_json),
- const grpc_slice_hash_table_vtable* vtable);
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value));
/// A helper function for looking up values in the table returned by
/// \a grpc_service_config_create_method_config_table().