aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--package.json2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c2
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c56
-rw-r--r--src/core/lib/iomgr/timer_generic.c68
-rw-r--r--src/core/lib/support/log_linux.c4
-rw-r--r--src/core/lib/surface/completion_queue.c633
-rw-r--r--src/core/lib/surface/completion_queue.h3
-rw-r--r--src/core/lib/surface/server.c5
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h4
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m112
-rw-r--r--src/objective-c/tests/RxLibraryUnitTests.m26
-rw-r--r--templates/package.json.template2
-rw-r--r--test/core/surface/completion_queue_test.c2
-rw-r--r--tools/internal_ci/linux/grpc_interop_tocloud.cfg2
-rwxr-xr-xtools/internal_ci/linux/grpc_interop_tocloud.sh2
-rw-r--r--tools/internal_ci/linux/grpc_interop_toprod.cfg26
-rw-r--r--tools/internal_ci/linux/grpc_interop_toprod.sh32
-rwxr-xr-xtools/run_tests/run_interop_tests.py19
18 files changed, 608 insertions, 392 deletions
diff --git a/package.json b/package.json
index d5eec72fc9..b4b16635c6 100644
--- a/package.json
+++ b/package.json
@@ -56,7 +56,7 @@
},
"binary": {
"module_name": "grpc_node",
- "module_path": "src/node/extension_binary",
+ "module_path": "src/node/extension_binary/{node_abi}-{platform}-{arch}",
"host": "https://storage.googleapis.com/",
"remote_path": "grpc-precompiled-binaries/node/{name}/v{version}",
"package_name": "{node_abi}-{platform}-{arch}.tar.gz"
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 56d340b8c2..fbef79ec31 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -158,6 +158,7 @@ static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ sd->user_data = NULL;
}
}
gpr_free(subchannel_list->subchannels);
@@ -578,6 +579,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ sd->user_data = NULL;
}
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 5574838187..5690431759 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -103,6 +103,32 @@ typedef struct pollable {
grpc_pollset_worker *root_worker;
} pollable;
+static const char *polling_obj_type_string(polling_obj_type t) {
+ switch (t) {
+ case PO_POLLING_GROUP:
+ return "polling_group";
+ case PO_POLLSET_SET:
+ return "pollset_set";
+ case PO_POLLSET:
+ return "pollset";
+ case PO_FD:
+ return "fd";
+ case PO_EMPTY_POLLABLE:
+ return "empty_pollable";
+ case PO_COUNT:
+ return "<invalid:count>";
+ }
+ return "<invalid>";
+}
+
+static char *pollable_desc(pollable *p) {
+ char *out;
+ gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
+ polling_obj_type_string(p->po.type), p->po.group, p->epfd,
+ p->wakeup.read_fd);
+ return out;
+}
+
static pollable g_empty_pollable;
static void pollable_init(pollable *p, polling_obj_type type);
@@ -472,7 +498,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
GPR_ASSERT(epfd != -1);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "add fd %p to pollable %p", fd, p);
+ gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
gpr_mu_lock(&fd->orphaned_mu);
@@ -537,10 +563,18 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
if (worker->pollable != &pollset->pollable) {
gpr_mu_lock(&worker->pollable->po.mu);
}
- if (worker->initialized_cv) {
+ if (worker->initialized_cv && worker != pollset->root_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
+ pollset, worker, &pollset->pollable, worker->pollable);
+ }
worker->kicked = true;
gpr_cv_signal(&worker->cv);
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
+ pollset, worker, &pollset->pollable, worker->pollable);
+ }
append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup),
"pollset_shutdown");
}
@@ -770,7 +804,9 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout);
+ char *desc = pollable_desc(p);
+ gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout);
+ gpr_free(desc);
}
if (timeout != 0) {
@@ -985,10 +1021,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
static const char *err_desc = "pollset_add_fd";
grpc_error *error = GRPC_ERROR_NONE;
if (pollset->current_pollable == &g_empty_pollable) {
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from empty to fd", pollset,
fd);
+ }
/* empty pollable --> single fd pollable */
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &fd->pollable;
@@ -997,16 +1034,23 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu);
REF_BY(fd, 2, "pollset_pollable");
} else if (pollset->current_pollable == &pollset->pollable) {
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
+ }
append_error(&error, pollable_add_fd(pollset->current_pollable, fd),
err_desc);
} else if (pollset->current_pollable != &fd->pollable) {
grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable;
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from fd %p to multipoller",
pollset, fd, had_fd);
+ }
+ /* Introduce a spurious completion.
+ If we do not, then it may be that the fd-specific epoll set consumed
+ a completion without being polled, leading to a missed edge going up. */
+ grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure);
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index bf73d2c685..e6a9eb0e86 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -44,41 +44,63 @@
grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false);
grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false);
+/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
+ * deadlines earlier than 'queue_deadline" cap are maintained in the heap and
+ * others are maintained in the list (unordered). This helps to keep the number
+ * of elements in the heap low.
+ *
+ * The 'queue_deadline_cap' gets recomputed periodically based on the timer
+ * stats maintained in 'stats' and the relevant timers are then moved from the
+ * 'list' to 'heap'
+ */
typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */
gpr_atm queue_deadline_cap;
+ /* The deadline of the next timer due in this shard */
gpr_atm min_deadline;
- /* Index in the g_shard_queue */
+ /* Index of this timer_shard in the g_shard_queue */
uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
list have the top bit of their deadline set to 0. */
grpc_timer_heap heap;
/* This holds timers whose deadline is >= queue_deadline_cap. */
grpc_timer list;
-} shard_type;
+} timer_shard;
+
+/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
+ * is hashed to select the timer shard to add the timer to */
+static timer_shard g_shards[NUM_SHARDS];
+
+/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
+ * the deadline of the next timer in each shard).
+ * Access to this is protected by g_shared_mutables.mu */
+static timer_shard *g_shard_queue[NUM_SHARDS];
+
+/* Thread local variable that stores the deadline of the next timer the thread
+ * has last-seen. This is an optimization to prevent the thread from checking
+ * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
+ * an expensive operation) */
+GPR_TLS_DECL(g_last_seen_min_timer);
struct shared_mutables {
+ /* The deadline of the next timer due across all timer shards */
gpr_atm min_timer;
/* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu;
bool initialized;
- /* Protects g_shard_queue */
+ /* Protects g_shard_queue (and the shared_mutables struct itself) */
gpr_mu mu;
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
static struct shared_mutables g_shared_mutables = {
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
};
+
static gpr_clock_type g_clock_type;
-static shard_type g_shards[NUM_SHARDS];
-/* Protected by g_shared_mutables.mu */
-static shard_type *g_shard_queue[NUM_SHARDS];
static gpr_timespec g_start_time;
-GPR_TLS_DECL(g_last_seen_min_timer);
-
static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
if (a > GPR_ATM_MAX - b) {
return GPR_ATM_MAX;
@@ -122,7 +144,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) {
return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0));
}
-static gpr_atm compute_min_deadline(shard_type *shard) {
+static gpr_atm compute_min_deadline(timer_shard *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
@@ -142,7 +164,7 @@ void grpc_timer_list_init(gpr_timespec now) {
grpc_register_tracer("timer_check", &grpc_timer_check_trace);
for (i = 0; i < NUM_SHARDS; i++) {
- shard_type *shard = &g_shards[i];
+ timer_shard *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
0.5);
@@ -161,7 +183,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
exec_ctx, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) {
- shard_type *shard = &g_shards[i];
+ timer_shard *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap);
}
@@ -187,7 +209,7 @@ static void list_remove(grpc_timer *timer) {
}
static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
- shard_type *temp;
+ timer_shard *temp;
temp = g_shard_queue[first_shard_queue_index];
g_shard_queue[first_shard_queue_index] =
g_shard_queue[first_shard_queue_index + 1];
@@ -198,7 +220,7 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
first_shard_queue_index + 1;
}
-static void note_deadline_change(shard_type *shard) {
+static void note_deadline_change(timer_shard *shard) {
while (shard->shard_queue_index > 0 &&
shard->min_deadline <
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
@@ -215,7 +237,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
int is_first_timer = 0;
- shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
@@ -303,7 +325,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
return;
}
- shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
@@ -321,12 +343,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
gpr_mu_unlock(&shard->mu);
}
-/* This is called when the queue is empty and "now" has reached the
- queue_deadline_cap. We compute a new queue deadline and then scan the map
- for timers that fall at or under it. Returns true if the queue is no
- longer empty.
+/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
+ all relevant timers in shard->list (i.e timers with deadlines earlier than
+ 'queue_deadline_cap') into into shard->heap.
+ Returns 'true' if shard->heap has atleast ONE element
REQUIRES: shard->mu locked */
-static int refill_queue(shard_type *shard, gpr_atm now) {
+static int refill_heap(timer_shard *shard, gpr_atm now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
@@ -363,7 +385,7 @@ static int refill_queue(shard_type *shard, gpr_atm now) {
/* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
-static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
+static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) {
grpc_timer *timer;
for (;;) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@@ -373,7 +395,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
}
if (grpc_timer_heap_is_empty(&shard->heap)) {
if (now < shard->queue_deadline_cap) return NULL;
- if (!refill_queue(shard, now)) return NULL;
+ if (!refill_heap(shard, now)) return NULL;
}
timer = grpc_timer_heap_top(&shard->heap);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@@ -393,7 +415,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
}
/* REQUIRES: shard->mu unlocked */
-static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
+static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard,
gpr_atm now, gpr_atm *new_min_deadline,
grpc_error *error) {
size_t n = 0;
diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c
index 5c512661a3..61d2346427 100644
--- a/src/core/lib/support/log_linux.c
+++ b/src/core/lib/support/log_linux.c
@@ -64,6 +64,8 @@ void gpr_default_log(gpr_log_func_args *args) {
time_t timer;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
struct tm tm;
+ static __thread long tid = 0;
+ if (tid == 0) tid = gettid();
timer = (time_t)now.tv_sec;
final_slash = strrchr(args->file, '/');
@@ -81,7 +83,7 @@ void gpr_default_log(gpr_log_func_args *args) {
gpr_asprintf(&prefix, "%s%s.%09" PRId32 " %7ld %s:%d]",
gpr_log_severity_string(args->severity), time_buffer,
- now.tv_nsec, gettid(), display_file, args->line);
+ now.tv_nsec, tid, display_file, args->line);
fprintf(stderr, "%-60s %s\n", prefix, args->message);
gpr_free(prefix);
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index b04aee6c73..14e55eda85 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -189,16 +189,19 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
- size_t (*size)();
- void (*begin_op)(grpc_completion_queue *cc, void *tag);
- void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag,
+ size_t data_size;
+ void (*init)(void *data);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
+ void (*destroy)(void *data);
+ void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
- grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline,
+ grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved);
- grpc_event (*pluck)(grpc_completion_queue *cc, void *tag,
+ grpc_event (*pluck)(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved);
} cq_vtable;
@@ -218,25 +221,28 @@ typedef struct grpc_cq_event_queue {
gpr_atm num_queue_items;
} grpc_cq_event_queue;
-/* TODO: sreek Refactor this based on the completion_type. Put completion-type
- * specific data in a different structure (and co-allocate memory for it along
- * with completion queue + pollset )*/
-typedef struct cq_data {
- gpr_mu *mu;
+typedef struct cq_next_data {
+ /** Completed events for completion-queues of type GRPC_CQ_NEXT */
+ grpc_cq_event_queue queue;
+ /** Counter of how many things have ever been queued on this completion queue
+ useful for avoiding locks to check the queue */
+ gpr_atm things_queued_ever;
+
+ /* Number of outstanding events (+1 if not shut down) */
+ gpr_atm pending_events;
+
+ int shutdown_called;
+} cq_next_data;
+
+typedef struct cq_pluck_data {
/** Completed events for completion-queues of type GRPC_CQ_PLUCK */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
- /** Completed events for completion-queues of type GRPC_CQ_NEXT */
- grpc_cq_event_queue queue;
-
/** Number of pending events (+1 if we're not shutdown) */
gpr_refcount pending_events;
- /** Once owning_refs drops to zero, we will destroy the cq */
- gpr_refcount owning_refs;
-
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
@@ -245,37 +251,45 @@ typedef struct cq_data {
gpr_atm shutdown;
int shutdown_called;
- int is_server_cq;
-
int num_pluckers;
- int num_polls;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
- grpc_closure pollset_shutdown_done;
+} cq_pluck_data;
+
+/* Completion queue structure */
+struct grpc_completion_queue {
+ /** Once owning_refs drops to zero, we will destroy the cq */
+ gpr_refcount owning_refs;
+
+ gpr_mu *mu;
+
+ const cq_vtable *vtable;
+ const cq_poller_vtable *poller_vtable;
#ifndef NDEBUG
void **outstanding_tags;
size_t outstanding_tag_count;
size_t outstanding_tag_capacity;
#endif
-} cq_data;
-/* Completion queue structure */
-struct grpc_completion_queue {
- cq_data data;
- const cq_vtable *vtable;
- const cq_poller_vtable *poller_vtable;
+ grpc_closure pollset_shutdown_done;
+ int num_polls;
};
/* Forward declarations */
-static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc);
-
-static size_t cq_size(grpc_completion_queue *cc);
-
-static void cq_begin_op(grpc_completion_queue *cc, void *tag);
+static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+
+static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
@@ -283,39 +297,51 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
void *done_arg, grpc_cq_completion *storage);
static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
-static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
+static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved);
-static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
+static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved);
+static void cq_init_next(void *data);
+static void cq_init_pluck(void *data);
+static void cq_destroy_next(void *data);
+static void cq_destroy_pluck(void *data);
+
/* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = {
/* GRPC_CQ_NEXT */
- {.cq_completion_type = GRPC_CQ_NEXT,
- .size = cq_size,
- .begin_op = cq_begin_op,
+ {.data_size = sizeof(cq_next_data),
+ .cq_completion_type = GRPC_CQ_NEXT,
+ .init = cq_init_next,
+ .shutdown = cq_shutdown_next,
+ .destroy = cq_destroy_next,
+ .begin_op = cq_begin_op_for_next,
.end_op = cq_end_op_for_next,
.next = cq_next,
.pluck = NULL},
/* GRPC_CQ_PLUCK */
- {.cq_completion_type = GRPC_CQ_PLUCK,
- .size = cq_size,
- .begin_op = cq_begin_op,
+ {.data_size = sizeof(cq_pluck_data),
+ .cq_completion_type = GRPC_CQ_PLUCK,
+ .init = cq_init_pluck,
+ .shutdown = cq_shutdown_pluck,
+ .destroy = cq_destroy_pluck,
+ .begin_op = cq_begin_op_for_pluck,
.end_op = cq_end_op_for_pluck,
.next = NULL,
.pluck = cq_pluck},
};
-#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
-#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1)
+#define DATA_FROM_CQ(cq) ((void *)(cq + 1))
+#define POLLSET_FROM_CQ(cq) \
+ ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq)))
grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true);
grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true);
@@ -329,7 +355,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true);
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
+static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq,
grpc_error *error);
static void cq_event_queue_init(grpc_cq_event_queue *q) {
@@ -342,9 +368,9 @@ static void cq_event_queue_destroy(grpc_cq_event_queue *q) {
gpr_mpscq_destroy(&q->queue);
}
-static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
+static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c);
- gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1);
+ return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
}
static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
@@ -367,16 +393,10 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
}
-static size_t cq_size(grpc_completion_queue *cc) {
- /* Size of the completion queue and the size of the pollset whose memory is
- allocated right after that of completion queue */
- return sizeof(grpc_completion_queue) + cc->poller_vtable->size();
-}
-
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type) {
- grpc_completion_queue *cc;
+ grpc_completion_queue *cq;
GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
@@ -389,158 +409,173 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
const cq_poller_vtable *poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
- cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
- cq_data *cqd = &cc->data;
+ cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
+ poller_vtable->size());
- cc->vtable = vtable;
- cc->poller_vtable = poller_vtable;
+ cq->vtable = vtable;
+ cq->poller_vtable = poller_vtable;
- poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu);
+ /* One for destroy(), one for pollset_shutdown */
+ gpr_ref_init(&cq->owning_refs, 2);
-#ifndef NDEBUG
- cqd->outstanding_tags = NULL;
- cqd->outstanding_tag_capacity = 0;
-#endif
+ poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
+ vtable->init(DATA_FROM_CQ(cq));
+
+ GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
+ grpc_schedule_on_exec_ctx);
+
+ GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
+
+ return cq;
+}
+static void cq_init_next(void *ptr) {
+ cq_next_data *cqd = ptr;
+ /* Initial ref is dropped by grpc_completion_queue_shutdown */
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
+ cqd->shutdown_called = false;
+ gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
+ cq_event_queue_init(&cqd->queue);
+}
+
+static void cq_destroy_next(void *ptr) {
+ cq_next_data *cqd = ptr;
+ GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
+ cq_event_queue_destroy(&cqd->queue);
+}
+
+static void cq_init_pluck(void *ptr) {
+ cq_pluck_data *cqd = ptr;
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cqd->pending_events, 1);
- /* One for destroy(), one for pollset_shutdown */
- gpr_ref_init(&cqd->owning_refs, 2);
cqd->completed_tail = &cqd->completed_head;
cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
gpr_atm_no_barrier_store(&cqd->shutdown, 0);
cqd->shutdown_called = 0;
- cqd->is_server_cq = 0;
cqd->num_pluckers = 0;
- cqd->num_polls = 0;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
-#ifndef NDEBUG
- cqd->outstanding_tag_count = 0;
-#endif
- cq_event_queue_init(&cqd->queue);
- GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc,
- grpc_schedule_on_exec_ctx);
-
- GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
+}
- return cc;
+static void cq_destroy_pluck(void *ptr) {
+ cq_pluck_data *cqd = ptr;
+ GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
}
-grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
- return cc->vtable->cq_completion_type;
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) {
+ return cq->vtable->cq_completion_type;
}
-int grpc_get_cq_poll_num(grpc_completion_queue *cc) {
+int grpc_get_cq_poll_num(grpc_completion_queue *cq) {
int cur_num_polls;
- gpr_mu_lock(cc->data.mu);
- cur_num_polls = cc->data.num_polls;
- gpr_mu_unlock(cc->data.mu);
+ gpr_mu_lock(cq->mu);
+ cur_num_polls = cq->num_polls;
+ gpr_mu_unlock(cq->mu);
return cur_num_polls;
}
#ifndef NDEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
+void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason,
const char *file, int line) {
- cq_data *cqd = &cc->data;
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
- gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count);
+ gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1,
+ "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
reason);
}
#else
-void grpc_cq_internal_ref(grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+void grpc_cq_internal_ref(grpc_completion_queue *cq) {
#endif
- gpr_ref(&cqd->owning_refs);
+ gpr_ref(&cq->owning_refs);
}
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_completion_queue *cc = arg;
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy");
+ grpc_completion_queue *cq = arg;
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
+void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
const char *reason, const char *file, int line) {
- cq_data *cqd = &cc->data;
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
- gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count);
+ gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1,
+ "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
reason);
}
#else
void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+ grpc_completion_queue *cq) {
#endif
- if (gpr_unref(&cqd->owning_refs)) {
- GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
- cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc));
- cq_event_queue_destroy(&cqd->queue);
+ if (gpr_unref(&cq->owning_refs)) {
+ cq->vtable->destroy(DATA_FROM_CQ(cq));
+ cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq));
#ifndef NDEBUG
- gpr_free(cqd->outstanding_tags);
+ gpr_free(cq->outstanding_tags);
#endif
- gpr_free(cc);
+ gpr_free(cq);
}
}
-static void cq_begin_op(grpc_completion_queue *cc, void *tag) {
- cq_data *cqd = &cc->data;
-#ifndef NDEBUG
- gpr_mu_lock(cqd->mu);
+static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ GPR_ASSERT(!cqd->shutdown_called);
+ gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
+}
+
+static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
- if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) {
- cqd->outstanding_tag_capacity =
- GPR_MAX(4, 2 * cqd->outstanding_tag_capacity);
- cqd->outstanding_tags =
- gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) *
- cqd->outstanding_tag_capacity);
- }
- cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cqd->mu);
-#endif
gpr_ref(&cqd->pending_events);
}
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
- cc->vtable->begin_op(cc, tag);
+void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ cq->vtable->begin_op(cq, tag);
}
#ifndef NDEBUG
-static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {
- cq_data *cqd = &cc->data;
+static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
if (lock_cq) {
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
}
- for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) {
- if (cqd->outstanding_tags[i] == tag) {
- cqd->outstanding_tag_count--;
- GPR_SWAP(void *, cqd->outstanding_tags[i],
- cqd->outstanding_tags[cqd->outstanding_tag_count]);
+ for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
+ if (cq->outstanding_tags[i] == tag) {
+ cq->outstanding_tag_count--;
+ GPR_SWAP(void *, cq->outstanding_tags[i],
+ cq->outstanding_tags[cq->outstanding_tag_count]);
found = 1;
break;
}
}
if (lock_cq) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
}
GPR_ASSERT(found);
}
#else
-static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {}
+static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
-/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
+/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
+ * completion
* type of GRPC_CQ_NEXT) */
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
@@ -553,16 +588,16 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
+ "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
}
- cq_data *cqd = &cc->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
storage->tag = tag;
@@ -570,28 +605,42 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
storage->done_arg = done_arg;
storage->next = (uintptr_t)(is_success);
- cq_check_tag(cc, tag, true); /* Used in debug builds only */
+ cq_check_tag(cq, tag, true); /* Used in debug builds only */
/* Add the completion to the queue */
- cq_event_queue_push(&cqd->queue, storage);
+ bool is_first = cq_event_queue_push(&cqd->queue, storage);
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
-
- gpr_mu_lock(cqd->mu);
-
- int shutdown = gpr_unref(&cqd->pending_events);
- if (!shutdown) {
- grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL);
- gpr_mu_unlock(cqd->mu);
-
- if (kick_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(kick_error);
- gpr_log(GPR_ERROR, "Kick failed: %s", msg);
-
- GRPC_ERROR_UNREF(kick_error);
+ bool will_definitely_shutdown =
+ gpr_atm_no_barrier_load(&cqd->pending_events) == 1;
+
+ if (!will_definitely_shutdown) {
+ /* Only kick if this is the first item queued */
+ if (is_first) {
+ gpr_mu_lock(cq->mu);
+ grpc_error *kick_error =
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ gpr_mu_unlock(cq->mu);
+
+ if (kick_error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(kick_error);
+ gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+ GRPC_ERROR_UNREF(kick_error);
+ }
+ }
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
} else {
- cq_finish_shutdown(exec_ctx, cc);
- gpr_mu_unlock(cqd->mu);
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_atm_rel_store(&cqd->pending_events, 0);
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
GPR_TIMER_END("cq_end_op_for_next", 0);
@@ -599,16 +648,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
+/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
+ * completion
* type of GRPC_CQ_PLUCK) */
static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
@@ -618,9 +668,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
+ "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
@@ -632,8 +682,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
storage->done_arg = done_arg;
storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success));
- gpr_mu_lock(cqd->mu);
- cq_check_tag(cc, tag, false); /* Used in debug builds only */
+ gpr_mu_lock(cq->mu);
+ cq_check_tag(cq, tag, false); /* Used in debug builds only */
/* Add to the list of completions */
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
@@ -652,9 +702,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
}
grpc_error *kick_error =
- cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(kick_error);
@@ -663,8 +713,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(kick_error);
}
} else {
- cq_finish_shutdown(exec_ctx, cc);
- gpr_mu_unlock(cqd->mu);
+ cq_finish_shutdown_pluck(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
}
GPR_TIMER_END("cq_end_op_for_pluck", 0);
@@ -672,12 +722,12 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
+void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
void *tag, grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage);
+ cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
}
typedef struct {
@@ -692,7 +742,7 @@ typedef struct {
static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
- cq_data *cqd = &cq->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -703,7 +753,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
/* Pop a cq_completion from the queue. Returns NULL if the queue is empty
- * might return NULL in some cases even if the queue is not empty; but that
+ * might return NULL in some cases even if the queue is not empty; but
+ * that
* is ok and doesn't affect correctness. Might effect the tail latencies a
* bit) */
a->stolen_completion = cq_event_queue_pop(&cqd->queue);
@@ -716,58 +767,56 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
#ifndef NDEBUG
-static void dump_pending_tags(grpc_completion_queue *cc) {
+static void dump_pending_tags(grpc_completion_queue *cq) {
if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return;
- cq_data *cqd = &cc->data;
-
gpr_strvec v;
gpr_strvec_init(&v);
gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
- gpr_mu_lock(cqd->mu);
- for (size_t i = 0; i < cqd->outstanding_tag_count; i++) {
+ gpr_mu_lock(cq->mu);
+ for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
char *s;
- gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]);
+ gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
gpr_strvec_add(&v, s);
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
char *out = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
gpr_log(GPR_DEBUG, "%s", out);
gpr_free(out);
}
#else
-static void dump_pending_tags(grpc_completion_queue *cc) {}
+static void dump_pending_tags(grpc_completion_queue *cq) {}
#endif
-static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
+static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) {
grpc_event ret;
gpr_timespec now;
- cq_data *cqd = &cc->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
GRPC_API_TRACE(
"grpc_completion_queue_next("
- "cc=%p, "
+ "cq=%p, "
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
reserved));
GPR_ASSERT(!reserved);
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CQ_INTERNAL_REF(cc, "next");
+ GRPC_CQ_INTERNAL_REF(cq, "next");
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
- .cq = cc,
+ .cq = cq,
.deadline = deadline,
.stolen_completion = NULL,
.tag = NULL,
@@ -800,21 +849,24 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
/* If c == NULL it means either the queue is empty OR in an transient
inconsistent state. If it is the latter, we shold do a 0-timeout poll
so that the thread comes back quickly from poll to make a second
- attempt at popping. Not doing this can potentially deadlock this thread
+ attempt at popping. Not doing this can potentially deadlock this
+ thread
forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
}
}
- if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
+ if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) {
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop() can sometimes return NULL even if the queue is not
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
/* Go to the beginning of the loop. No point doing a poll because
- (cc->shutdown == true) is only possible when there is no pending work
- (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion
+ (cq->shutdown == true) is only possible when there is no pending
+ work
+ (i.e cq->pending_events == 0) and any outstanding
+ grpc_cq_completion
events are already queued on this cq */
continue;
}
@@ -828,16 +880,16 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
/* The main polling work happens in grpc_pollset_work */
- gpr_mu_lock(cqd->mu);
- cqd->num_polls++;
- grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ gpr_mu_lock(cq->mu);
+ cq->num_polls++;
+ grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
NULL, now, iteration_deadline);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(err);
@@ -846,30 +898,74 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
is_finished_arg.first_loop = false;
}
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next");
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+ if (cq_event_queue_num_items(&cqd->queue) > 0 &&
+ gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
+ gpr_mu_lock(cq->mu);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ gpr_mu_unlock(cq->mu);
+ }
+
GPR_TIMER_END("grpc_completion_queue_next", 0);
return ret;
}
-grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
+/* Finishes the completion queue shutdown. This means that there are no more
+ completion events / tags expected from the completion queue
+ - Must be called under completion queue lock
+ - Must be called only once in completion queue's lifetime
+ - grpc_completion_queue_shutdown() MUST have been called before calling
+ this function */
+static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+
+ GPR_ASSERT(cqd->shutdown_called);
+ GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
+
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
+}
+
+static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ if (cqd->shutdown_called) {
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
+ return;
+ }
+ cqd->shutdown_called = 1;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_next(exec_ctx, cq);
+ }
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+}
+
+grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline, void *reserved) {
- return cc->vtable->next(cc, deadline, reserved);
+ return cq->vtable->next(cq, deadline, reserved);
}
-static int add_plucker(grpc_completion_queue *cc, void *tag,
+static int add_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
@@ -879,9 +975,9 @@ static int add_plucker(grpc_completion_queue *cc, void *tag,
return 1;
}
-static void del_plucker(grpc_completion_queue *cc, void *tag,
+static void del_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
cqd->num_pluckers--;
@@ -895,13 +991,13 @@ static void del_plucker(grpc_completion_queue *cc, void *tag,
static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
- cq_data *cqd = &cq->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
grpc_cq_completion *c;
@@ -913,51 +1009,51 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
if (c == cqd->completed_tail) {
cqd->completed_tail = prev;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
a->stolen_completion = c;
return true;
}
prev = c;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
}
return !a->first_loop &&
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
-static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
+static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) {
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
- "cc=%p, tag=%p, "
+ "cq=%p, tag=%p, "
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec,
+ 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, reserved));
}
GPR_ASSERT(!reserved);
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CQ_INTERNAL_REF(cc, "pluck");
- gpr_mu_lock(cqd->mu);
+ GRPC_CQ_INTERNAL_REF(cq, "pluck");
+ gpr_mu_lock(cq->mu);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
- .cq = cc,
+ .cq = cq,
.deadline = deadline,
.stolen_completion = NULL,
.tag = tag,
@@ -966,7 +1062,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
c = is_finished_arg.stolen_completion;
is_finished_arg.stolen_completion = NULL;
ret.type = GRPC_OP_COMPLETE;
@@ -983,7 +1079,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
if (c == cqd->completed_tail) {
cqd->completed_tail = prev;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -993,54 +1089,54 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
prev = c;
}
if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!add_plucker(cc, tag, &worker)) {
+ if (!add_plucker(cq, tag, &worker)) {
gpr_log(GPR_DEBUG,
"Too many outstanding grpc_completion_queue_pluck calls: maximum "
"is %d",
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
- del_plucker(cc, tag, &worker);
- gpr_mu_unlock(cqd->mu);
+ del_plucker(cq, tag, &worker);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
- cqd->num_polls++;
- grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ cq->num_polls++;
+ grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
&worker, now, deadline);
if (err != GRPC_ERROR_NONE) {
- del_plucker(cc, tag, &worker);
- gpr_mu_unlock(cqd->mu);
+ del_plucker(cq, tag, &worker);
+ gpr_mu_unlock(cq->mu);
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
is_finished_arg.first_loop = false;
- del_plucker(cc, tag, &worker);
+ del_plucker(cq, tag, &worker);
}
done:
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck");
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck");
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
@@ -1049,85 +1145,66 @@ done:
return ret;
}
-grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
+grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved) {
- return cc->vtable->pluck(cc, tag, deadline, reserved);
+ return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-/* Finishes the completion queue shutdown. This means that there are no more
- completion events / tags expected from the completion queue
- - Must be called under completion queue lock
- - Must be called only once in completion queue's lifetime
- - grpc_completion_queue_shutdown() MUST have been called before calling
- this function */
-static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
- cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
- &cqd->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
}
-/* Shutdown simply drops a ref that we reserved at creation time; if we drop
- to zero here, then enter shutdown mode and wake up any waiters */
-void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
- GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
- cq_data *cqd = &cc->data;
+static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
cqd->shutdown_called = 1;
if (gpr_unref(&cqd->pending_events)) {
- cq_finish_shutdown(&exec_ctx, cc);
+ cq_finish_shutdown_pluck(exec_ctx, cq);
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
+}
+
+/* Shutdown simply drops a ref that we reserved at creation time; if we drop
+ to zero here, then enter shutdown mode and wake up any waiters */
+void grpc_completion_queue_shutdown(grpc_completion_queue *cq) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
+ GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
+ cq->vtable->shutdown(&exec_ctx, cq);
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
-void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
- GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
+void grpc_completion_queue_destroy(grpc_completion_queue *cq) {
+ GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
- grpc_completion_queue_shutdown(cc);
-
- /* TODO (sreek): This should not ideally be here. Refactor it into the
- * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */
- if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) {
- GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0);
- }
+ grpc_completion_queue_shutdown(cq);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy");
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy");
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}
-grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
-}
-
-grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
- return CQ_FROM_POLLSET(ps);
-}
-
-void grpc_cq_mark_server_cq(grpc_completion_queue *cc) {
- cc->data.is_server_cq = 1;
-}
-
-bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
- return cc->data.is_server_cq;
+grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) {
+ return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL;
}
-bool grpc_cq_can_listen(grpc_completion_queue *cc) {
- return cc->poller_vtable->can_listen;
+bool grpc_cq_can_listen(grpc_completion_queue *cq) {
+ return cq->poller_vtable->can_listen;
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 97ea9cae20..af44482513 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -84,10 +84,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
void *done_arg, grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
-grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps);
-void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
-bool grpc_cq_is_server_cq(grpc_completion_queue *cc);
bool grpc_cq_can_listen(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 0cd436883a..de0a91e2b2 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -975,8 +975,6 @@ static void register_completion_queue(grpc_server *server,
if (server->cqs[i] == cq) return;
}
- grpc_cq_mark_server_cq(cq);
-
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
@@ -1156,9 +1154,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
chand->channel = channel;
size_t cq_idx;
- grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset);
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
- if (s->cqs[cq_idx] == accepting_cq) break;
+ if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
}
if (cq_idx == s->cq_count) {
/* completion queue not found: pick a random one to publish new calls to */
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index bd7d4ad691..a871ea895a 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -27,8 +27,8 @@
* immediately, unless flow control prevents it.
* If it is throttled and keeps receiving values, as well as if it receives values before being
* started, it will buffer them and propagate them in order as soon as its state becomes Started.
- * If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
- * propagate the error immediately.
+ * If it receives an end of stream (via -writesFinishedWithError:), it will buffer the EOS after the
+ * last buffered value and issue it to the writeable after all buffered values are issued.
*
* Beware that a pipe of this type can't prevent receiving more values when it is paused (for
* example if used to write data to a congested network connection). Because in such situations the
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index e4a7cc40f9..99cb0ad971 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -18,11 +18,13 @@
#import "GRXBufferedPipe.h"
+@interface GRXBufferedPipe ()
+@property(atomic) id<GRXWriteable> writeable;
+@end
+
@implementation GRXBufferedPipe {
- id<GRXWriteable> _writeable;
- NSMutableArray *_queue;
- BOOL _inputIsFinished;
NSError *_errorOrNil;
+ dispatch_queue_t _writeQueue;
}
@synthesize state = _state;
@@ -33,99 +35,79 @@
- (instancetype)init {
if (self = [super init]) {
- _queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
+ _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ dispatch_suspend(_writeQueue);
}
return self;
}
-- (id)popValue {
- id value = _queue[0];
- [_queue removeObjectAtIndex:0];
- return value;
-}
-
-- (void)writeBufferUntilPausedOrStopped {
- while (_state == GRXWriterStateStarted && _queue.count > 0) {
- [_writeable writeValue:[self popValue]];
- }
- if (_inputIsFinished && _queue.count == 0) {
- // Our writer finished normally while we were paused or not-started-yet.
- [self finishWithError:_errorOrNil];
- }
-}
-
#pragma mark GRXWriteable implementation
-// Returns whether events can be simply propagated to the other end of the pipe.
-- (BOOL)shouldFastForward {
- return _state == GRXWriterStateStarted && _queue.count == 0;
-}
-
- (void)writeValue:(id)value {
- if (self.shouldFastForward) {
- // Skip the queue.
- [_writeable writeValue:value];
- } else {
+ if ([value respondsToSelector:@selector(copy)]) {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
- if ([value respondsToSelector:@selector(copy)]) {
- value = [value copy];
- }
- [_queue addObject:value];
+ value = [value copy];
}
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^(void) {
+ [weakSelf.writeable writeValue:value];
+ });
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _inputIsFinished = YES;
- _errorOrNil = errorOrNil;
- if (errorOrNil || self.shouldFastForward) {
- // No need to write pending values.
- [self finishWithError:_errorOrNil];
- }
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^{
+ [weakSelf finishWithError:errorOrNil];
+ });
}
#pragma mark GRXWriter implementation
- (void)setState:(GRXWriterState)newState {
- // Manual transitions are only allowed from the started or paused states.
- if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
- return;
- }
-
- switch (newState) {
- case GRXWriterStateFinished:
- _state = newState;
- _queue = nil;
- // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
- // writeable to be messaged anymore.
- _writeable = nil;
- return;
- case GRXWriterStatePaused:
- _state = newState;
+ @synchronized (self) {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
- case GRXWriterStateStarted:
- if (_state == GRXWriterStatePaused) {
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ self.writeable = nil;
+ if (_state == GRXWriterStatePaused) {
+ dispatch_resume(_writeQueue);
+ }
_state = newState;
- [self writeBufferUntilPausedOrStopped];
- }
- return;
- case GRXWriterStateNotStarted:
- return;
+ return;
+ case GRXWriterStatePaused:
+ if (_state == GRXWriterStateStarted) {
+ _state = newState;
+ dispatch_suspend(_writeQueue);
+ }
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ dispatch_resume(_writeQueue);
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ self.writeable = writeable;
_state = GRXWriterStateStarted;
- _writeable = writeable;
- [self writeBufferUntilPausedOrStopped];
+ dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
- id<GRXWriteable> writeable = _writeable;
+ [self.writeable writesFinishedWithError:errorOrNil];
self.state = GRXWriterStateFinished;
- [writeable writesFinishedWithError:errorOrNil];
}
@end
diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m
index f152452b01..fa3ded4c0c 100644
--- a/src/objective-c/tests/RxLibraryUnitTests.m
+++ b/src/objective-c/tests/RxLibraryUnitTests.m
@@ -23,6 +23,8 @@
#import <RxLibrary/GRXWriteable.h>
#import <RxLibrary/GRXWriter.h>
+#define TEST_TIMEOUT 1
+
// A mock of a GRXSingleValueHandler block that can be queried for how many times it was called and
// what were the last values passed to it.
//
@@ -140,26 +142,38 @@
#pragma mark BufferedPipe
- (void)testBufferedPipePropagatesValue {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
+
id anyValue = @7;
// If:
GRXBufferedPipe *pipe = [GRXBufferedPipe pipe];
[pipe startWithWriteable:writeable];
[pipe writeValue:anyValue];
+ [pipe writesFinishedWithError:nil];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, anyValue);
XCTAssertEqualObjects(handler.errorOrNil, nil);
+
}
- (void)testBufferedPipePropagatesError {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
// If:
@@ -168,15 +182,20 @@
[pipe writesFinishedWithError:anyError];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, nil);
XCTAssertEqualObjects(handler.errorOrNil, anyError);
}
- (void)testBufferedPipeFinishWriteWhilePaused {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
id anyValue = @7;
// If:
@@ -188,6 +207,7 @@
[pipe startWithWriteable:writeable];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, anyValue);
XCTAssertEqualObjects(handler.errorOrNil, nil);
diff --git a/templates/package.json.template b/templates/package.json.template
index af13d528db..50893d3a54 100644
--- a/templates/package.json.template
+++ b/templates/package.json.template
@@ -58,7 +58,7 @@
},
"binary": {
"module_name": "grpc_node",
- "module_path": "src/node/extension_binary",
+ "module_path": "src/node/extension_binary/{node_abi}-{platform}-{arch}",
"host": "https://storage.googleapis.com/",
"remote_path": "grpc-precompiled-binaries/node/{name}/v{version}",
"package_name": "{node_abi}-{platform}-{arch}.tar.gz"
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index c27337aaa8..f9d88d6327 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -93,7 +93,7 @@ static void test_pollset_conversion(void) {
attr.cq_polling_type = polling_types[j];
cq = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, NULL);
- GPR_ASSERT(grpc_cq_from_pollset(grpc_cq_pollset(cq)) == cq);
+ GPR_ASSERT(grpc_cq_pollset(cq) != NULL);
shutdown_and_destroy(cq);
}
}
diff --git a/tools/internal_ci/linux/grpc_interop_tocloud.cfg b/tools/internal_ci/linux/grpc_interop_tocloud.cfg
index 1f6421c83d..0c31b497af 100644
--- a/tools/internal_ci/linux/grpc_interop_tocloud.cfg
+++ b/tools/internal_ci/linux/grpc_interop_tocloud.cfg
@@ -20,6 +20,6 @@ build_file: "grpc/tools/internal_ci/linux/grpc_interop_tocloud.sh"
timeout_mins: 480
action {
define_artifacts {
- regex: "**/report.xml"
+ regex: "**/sponge_log.xml"
}
}
diff --git a/tools/internal_ci/linux/grpc_interop_tocloud.sh b/tools/internal_ci/linux/grpc_interop_tocloud.sh
index fe5c9a5130..e3ba25af5d 100755
--- a/tools/internal_ci/linux/grpc_interop_tocloud.sh
+++ b/tools/internal_ci/linux/grpc_interop_tocloud.sh
@@ -23,4 +23,4 @@ cd $(dirname $0)/../../..
source tools/internal_ci/helper_scripts/prepare_build_linux_rc
source tools/internal_ci/helper_scripts/prepare_build_interop_rc
-tools/run_tests/run_interop_tests.py -l all -s all --use_docker --http2_interop -t -j 12 $@
+tools/run_tests/run_interop_tests.py -l all -s all --use_docker --http2_interop --internal_ci -t -j 12 $@
diff --git a/tools/internal_ci/linux/grpc_interop_toprod.cfg b/tools/internal_ci/linux/grpc_interop_toprod.cfg
new file mode 100644
index 0000000000..18978b87e5
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_interop_toprod.cfg
@@ -0,0 +1,26 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_interop_toprod.sh"
+# grpc_interop tests can take 6+ hours to complete.
+timeout_mins: 60
+action {
+ define_artifacts {
+ regex: "**/sponge_log.xml"
+ }
+}
+
diff --git a/tools/internal_ci/linux/grpc_interop_toprod.sh b/tools/internal_ci/linux/grpc_interop_toprod.sh
new file mode 100644
index 0000000000..3d06185406
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_interop_toprod.sh
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -ex
+
+export LANG=en_US.UTF-8
+
+# Enter the gRPC repo root
+cd $(dirname $0)/../../..
+
+source tools/internal_ci/helper_scripts/prepare_build_linux_rc
+source tools/internal_ci/helper_scripts/prepare_build_interop_rc
+
+tools/run_tests/run_interop_tests.py \
+ -l all \
+ --cloud_to_prod \
+ --cloud_to_prod_auth \
+ --prod_servers default gateway_v4 \
+ --use_docker --internal_ci -t -j 12 $@
+
diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py
index 80062aa37d..1e702a8636 100755
--- a/tools/run_tests/run_interop_tests.py
+++ b/tools/run_tests/run_interop_tests.py
@@ -63,6 +63,13 @@ _TEST_TIMEOUT = 3*60
# see https://github.com/grpc/grpc/issues/9779
_SKIP_DATA_FRAME_PADDING = ['data_frame_padding']
+# report suffix is important for reports to get picked up by internal CI
+_INTERNAL_CL_XML_REPORT = 'sponge_log.xml'
+
+# report suffix is important for reports to get picked up by internal CI
+_XML_REPORT = 'report.xml'
+
+
class CXXLanguage:
def __init__(self):
@@ -943,7 +950,12 @@ argp.add_argument('--insecure',
action='store_const',
const=True,
help='Whether to use secure channel.')
-
+argp.add_argument('--internal_ci',
+ default=False,
+ action='store_const',
+ const=True,
+ help=('Put reports into subdirectories to improve '
+ 'presentation of results by Internal CI.'))
args = argp.parse_args()
servers = set(s for s in itertools.chain.from_iterable(_SERVERS
@@ -1201,7 +1213,10 @@ try:
write_cmdlog_maybe(server_manual_cmd_log, 'interop_server_cmds.sh')
write_cmdlog_maybe(client_manual_cmd_log, 'interop_client_cmds.sh')
- report_utils.render_junit_xml_report(resultset, 'report.xml')
+ xml_report_name = _XML_REPORT
+ if args.internal_ci:
+ xml_report_name = _INTERNAL_CL_XML_REPORT
+ report_utils.render_junit_xml_report(resultset, xml_report_name)
for name, job in resultset.items():
if "http2" in name: