diff options
-rw-r--r-- | package.json | 2 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 56 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 68 | ||||
-rw-r--r-- | src/core/lib/support/log_linux.c | 4 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 633 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 5 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.h | 4 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.m | 112 | ||||
-rw-r--r-- | src/objective-c/tests/RxLibraryUnitTests.m | 26 | ||||
-rw-r--r-- | templates/package.json.template | 2 | ||||
-rw-r--r-- | test/core/surface/completion_queue_test.c | 2 | ||||
-rw-r--r-- | tools/internal_ci/linux/grpc_interop_tocloud.cfg | 2 | ||||
-rwxr-xr-x | tools/internal_ci/linux/grpc_interop_tocloud.sh | 2 | ||||
-rw-r--r-- | tools/internal_ci/linux/grpc_interop_toprod.cfg | 26 | ||||
-rw-r--r-- | tools/internal_ci/linux/grpc_interop_toprod.sh | 32 | ||||
-rwxr-xr-x | tools/run_tests/run_interop_tests.py | 19 |
18 files changed, 608 insertions, 392 deletions
diff --git a/package.json b/package.json index d5eec72fc9..b4b16635c6 100644 --- a/package.json +++ b/package.json @@ -56,7 +56,7 @@ }, "binary": { "module_name": "grpc_node", - "module_path": "src/node/extension_binary", + "module_path": "src/node/extension_binary/{node_abi}-{platform}-{arch}", "host": "https://storage.googleapis.com/", "remote_path": "grpc-precompiled-binaries/node/{name}/v{version}", "package_name": "{node_abi}-{platform}-{arch}.tar.gz" diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 56d340b8c2..fbef79ec31 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -158,6 +158,7 @@ static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data = NULL; } } gpr_free(subchannel_list->subchannels); @@ -578,6 +579,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data = NULL; } if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { /* the policy is shutting down. Flush all the pending picks... */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 5574838187..5690431759 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -103,6 +103,32 @@ typedef struct pollable { grpc_pollset_worker *root_worker; } pollable; +static const char *polling_obj_type_string(polling_obj_type t) { + switch (t) { + case PO_POLLING_GROUP: + return "polling_group"; + case PO_POLLSET_SET: + return "pollset_set"; + case PO_POLLSET: + return "pollset"; + case PO_FD: + return "fd"; + case PO_EMPTY_POLLABLE: + return "empty_pollable"; + case PO_COUNT: + return "<invalid:count>"; + } + return "<invalid>"; +} + +static char *pollable_desc(pollable *p) { + char *out; + gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", + polling_obj_type_string(p->po.type), p->po.group, p->epfd, + p->wakeup.read_fd); + return out; +} + static pollable g_empty_pollable; static void pollable_init(pollable *p, polling_obj_type type); @@ -472,7 +498,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { GPR_ASSERT(epfd != -1); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "add fd %p to pollable %p", fd, p); + gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } gpr_mu_lock(&fd->orphaned_mu); @@ -537,10 +563,18 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, if (worker->pollable != &pollset->pollable) { gpr_mu_lock(&worker->pollable->po.mu); } - if (worker->initialized_cv) { + if (worker->initialized_cv && worker != pollset->root_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", + pollset, worker, &pollset->pollable, worker->pollable); + } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", + pollset, worker, &pollset->pollable, worker->pollable); + } append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), "pollset_shutdown"); } @@ -770,7 +804,9 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout = poll_deadline_to_millis_timeout(deadline, now); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout); + char *desc = pollable_desc(p); + gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout); + gpr_free(desc); } if (timeout != 0) { @@ -985,10 +1021,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; if (pollset->current_pollable == &g_empty_pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); + } /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &fd->pollable; @@ -997,16 +1034,23 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); REF_BY(fd, 2, "pollset_pollable"); } else if (pollset->current_pollable == &pollset->pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); + } append_error(&error, pollable_add_fd(pollset->current_pollable, fd), err_desc); } else if (pollset->current_pollable != &fd->pollable) { grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", pollset, fd, had_fd); + } + /* Introduce a spurious completion. + If we do not, then it may be that the fd-specific epoll set consumed + a completion without being polled, leading to a missed edge going up. */ + grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure); pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &pollset->pollable; if (append_error(&error, pollable_materialize(&pollset->pollable), diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index bf73d2c685..e6a9eb0e86 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -44,41 +44,63 @@ grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false); grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false); +/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with + * deadlines earlier than 'queue_deadline" cap are maintained in the heap and + * others are maintained in the list (unordered). This helps to keep the number + * of elements in the heap low. + * + * The 'queue_deadline_cap' gets recomputed periodically based on the timer + * stats maintained in 'stats' and the relevant timers are then moved from the + * 'list' to 'heap' + */ typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; /* All and only timers with deadlines <= this will be in the heap. */ gpr_atm queue_deadline_cap; + /* The deadline of the next timer due in this shard */ gpr_atm min_deadline; - /* Index in the g_shard_queue */ + /* Index of this timer_shard in the g_shard_queue */ uint32_t shard_queue_index; /* This holds all timers with deadlines < queue_deadline_cap. Timers in this list have the top bit of their deadline set to 0. */ grpc_timer_heap heap; /* This holds timers whose deadline is >= queue_deadline_cap. */ grpc_timer list; -} shard_type; +} timer_shard; + +/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address + * is hashed to select the timer shard to add the timer to */ +static timer_shard g_shards[NUM_SHARDS]; + +/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e + * the deadline of the next timer in each shard). + * Access to this is protected by g_shared_mutables.mu */ +static timer_shard *g_shard_queue[NUM_SHARDS]; + +/* Thread local variable that stores the deadline of the next timer the thread + * has last-seen. This is an optimization to prevent the thread from checking + * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock, + * an expensive operation) */ +GPR_TLS_DECL(g_last_seen_min_timer); struct shared_mutables { + /* The deadline of the next timer due across all timer shards */ gpr_atm min_timer; /* Allow only one run_some_expired_timers at once */ gpr_spinlock checker_mu; bool initialized; - /* Protects g_shard_queue */ + /* Protects g_shard_queue (and the shared_mutables struct itself) */ gpr_mu mu; } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); static struct shared_mutables g_shared_mutables = { .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, }; + static gpr_clock_type g_clock_type; -static shard_type g_shards[NUM_SHARDS]; -/* Protected by g_shared_mutables.mu */ -static shard_type *g_shard_queue[NUM_SHARDS]; static gpr_timespec g_start_time; -GPR_TLS_DECL(g_last_seen_min_timer); - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -122,7 +144,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) { return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); } -static gpr_atm compute_min_deadline(shard_type *shard) { +static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; @@ -142,7 +164,7 @@ void grpc_timer_list_init(gpr_timespec now) { grpc_register_tracer("timer_check", &grpc_timer_check_trace); for (i = 0; i < NUM_SHARDS; i++) { - shard_type *shard = &g_shards[i]; + timer_shard *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); @@ -161,7 +183,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { exec_ctx, GPR_ATM_MAX, NULL, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { - shard_type *shard = &g_shards[i]; + timer_shard *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); grpc_timer_heap_destroy(&shard->heap); } @@ -187,7 +209,7 @@ static void list_remove(grpc_timer *timer) { } static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { - shard_type *temp; + timer_shard *temp; temp = g_shard_queue[first_shard_queue_index]; g_shard_queue[first_shard_queue_index] = g_shard_queue[first_shard_queue_index + 1]; @@ -198,7 +220,7 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { first_shard_queue_index + 1; } -static void note_deadline_change(shard_type *shard) { +static void note_deadline_change(timer_shard *shard) { while (shard->shard_queue_index > 0 && shard->min_deadline < g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { @@ -215,7 +237,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_closure *closure, gpr_timespec now) { int is_first_timer = 0; - shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; + timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; @@ -303,7 +325,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { return; } - shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; + timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; gpr_mu_lock(&shard->mu); if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, @@ -321,12 +343,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { gpr_mu_unlock(&shard->mu); } -/* This is called when the queue is empty and "now" has reached the - queue_deadline_cap. We compute a new queue deadline and then scan the map - for timers that fall at or under it. Returns true if the queue is no - longer empty. +/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving + all relevant timers in shard->list (i.e timers with deadlines earlier than + 'queue_deadline_cap') into into shard->heap. + Returns 'true' if shard->heap has atleast ONE element REQUIRES: shard->mu locked */ -static int refill_queue(shard_type *shard, gpr_atm now) { +static int refill_heap(timer_shard *shard, gpr_atm now) { /* Compute the new queue window width and bound by the limits: */ double computed_deadline_delta = grpc_time_averaged_stats_update_average(&shard->stats) * @@ -363,7 +385,7 @@ static int refill_queue(shard_type *shard, gpr_atm now) { /* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { +static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { grpc_timer *timer; for (;;) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { @@ -373,7 +395,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { } if (grpc_timer_heap_is_empty(&shard->heap)) { if (now < shard->queue_deadline_cap) return NULL; - if (!refill_queue(shard, now)) return NULL; + if (!refill_heap(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { @@ -393,7 +415,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { } /* REQUIRES: shard->mu unlocked */ -static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, +static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, gpr_atm now, gpr_atm *new_min_deadline, grpc_error *error) { size_t n = 0; diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c index 5c512661a3..61d2346427 100644 --- a/src/core/lib/support/log_linux.c +++ b/src/core/lib/support/log_linux.c @@ -64,6 +64,8 @@ void gpr_default_log(gpr_log_func_args *args) { time_t timer; gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); struct tm tm; + static __thread long tid = 0; + if (tid == 0) tid = gettid(); timer = (time_t)now.tv_sec; final_slash = strrchr(args->file, '/'); @@ -81,7 +83,7 @@ void gpr_default_log(gpr_log_func_args *args) { gpr_asprintf(&prefix, "%s%s.%09" PRId32 " %7ld %s:%d]", gpr_log_severity_string(args->severity), time_buffer, - now.tv_nsec, gettid(), display_file, args->line); + now.tv_nsec, tid, display_file, args->line); fprintf(stderr, "%-60s %s\n", prefix, args->message); gpr_free(prefix); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b04aee6c73..14e55eda85 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -189,16 +189,19 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; - size_t (*size)(); - void (*begin_op)(grpc_completion_queue *cc, void *tag); - void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, + size_t data_size; + void (*init)(void *data); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); + void (*destroy)(void *data); + void (*begin_op)(grpc_completion_queue *cq, void *tag); + void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); - grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline, + grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); - grpc_event (*pluck)(grpc_completion_queue *cc, void *tag, + grpc_event (*pluck)(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); } cq_vtable; @@ -218,25 +221,28 @@ typedef struct grpc_cq_event_queue { gpr_atm num_queue_items; } grpc_cq_event_queue; -/* TODO: sreek Refactor this based on the completion_type. Put completion-type - * specific data in a different structure (and co-allocate memory for it along - * with completion queue + pollset )*/ -typedef struct cq_data { - gpr_mu *mu; +typedef struct cq_next_data { + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ + grpc_cq_event_queue queue; + /** Counter of how many things have ever been queued on this completion queue + useful for avoiding locks to check the queue */ + gpr_atm things_queued_ever; + + /* Number of outstanding events (+1 if not shut down) */ + gpr_atm pending_events; + + int shutdown_called; +} cq_next_data; + +typedef struct cq_pluck_data { /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; - /** Completed events for completion-queues of type GRPC_CQ_NEXT */ - grpc_cq_event_queue queue; - /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; - /** Once owning_refs drops to zero, we will destroy the cq */ - gpr_refcount owning_refs; - /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; @@ -245,37 +251,45 @@ typedef struct cq_data { gpr_atm shutdown; int shutdown_called; - int is_server_cq; - int num_pluckers; - int num_polls; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_closure pollset_shutdown_done; +} cq_pluck_data; + +/* Completion queue structure */ +struct grpc_completion_queue { + /** Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; + + gpr_mu *mu; + + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; #ifndef NDEBUG void **outstanding_tags; size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif -} cq_data; -/* Completion queue structure */ -struct grpc_completion_queue { - cq_data data; - const cq_vtable *vtable; - const cq_poller_vtable *poller_vtable; + grpc_closure pollset_shutdown_done; + int num_polls; }; /* Forward declarations */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc); - -static size_t cq_size(grpc_completion_queue *cc); - -static void cq_begin_op(grpc_completion_queue *cc, void *tag); +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); + +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -283,39 +297,51 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage); static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); +static void cq_init_next(void *data); +static void cq_init_pluck(void *data); +static void cq_destroy_next(void *data); +static void cq_destroy_pluck(void *data); + /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ - {.cq_completion_type = GRPC_CQ_NEXT, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_next_data), + .cq_completion_type = GRPC_CQ_NEXT, + .init = cq_init_next, + .shutdown = cq_shutdown_next, + .destroy = cq_destroy_next, + .begin_op = cq_begin_op_for_next, .end_op = cq_end_op_for_next, .next = cq_next, .pluck = NULL}, /* GRPC_CQ_PLUCK */ - {.cq_completion_type = GRPC_CQ_PLUCK, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_pluck_data), + .cq_completion_type = GRPC_CQ_PLUCK, + .init = cq_init_pluck, + .shutdown = cq_shutdown_pluck, + .destroy = cq_destroy_pluck, + .begin_op = cq_begin_op_for_pluck, .end_op = cq_end_op_for_pluck, .next = NULL, .pluck = cq_pluck}, }; -#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) -#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) +#define DATA_FROM_CQ(cq) ((void *)(cq + 1)) +#define POLLSET_FROM_CQ(cq) \ + ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq))) grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true); grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); @@ -329,7 +355,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); gpr_free(_ev); \ } -static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); static void cq_event_queue_init(grpc_cq_event_queue *q) { @@ -342,9 +368,9 @@ static void cq_event_queue_destroy(grpc_cq_event_queue *q) { gpr_mpscq_destroy(&q->queue); } -static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { +static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); - gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); + return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0; } static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { @@ -367,16 +393,10 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } -static size_t cq_size(grpc_completion_queue *cc) { - /* Size of the completion queue and the size of the pollset whose memory is - allocated right after that of completion queue */ - return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); -} - grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { - grpc_completion_queue *cc; + grpc_completion_queue *cq; GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0); @@ -389,158 +409,173 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - cq_data *cqd = &cc->data; + cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + + poller_vtable->size()); - cc->vtable = vtable; - cc->poller_vtable = poller_vtable; + cq->vtable = vtable; + cq->poller_vtable = poller_vtable; - poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu); + /* One for destroy(), one for pollset_shutdown */ + gpr_ref_init(&cq->owning_refs, 2); -#ifndef NDEBUG - cqd->outstanding_tags = NULL; - cqd->outstanding_tag_capacity = 0; -#endif + poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); + vtable->init(DATA_FROM_CQ(cq)); + + GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, + grpc_schedule_on_exec_ctx); + + GPR_TIMER_END("grpc_completion_queue_create_internal", 0); + + return cq; +} +static void cq_init_next(void *ptr) { + cq_next_data *cqd = ptr; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ + gpr_atm_no_barrier_store(&cqd->pending_events, 1); + cqd->shutdown_called = false; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + cq_event_queue_init(&cqd->queue); +} + +static void cq_destroy_next(void *ptr) { + cq_next_data *cqd = ptr; + GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); + cq_event_queue_destroy(&cqd->queue); +} + +static void cq_init_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cqd->pending_events, 1); - /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cqd->owning_refs, 2); cqd->completed_tail = &cqd->completed_head; cqd->completed_head.next = (uintptr_t)cqd->completed_tail; gpr_atm_no_barrier_store(&cqd->shutdown, 0); cqd->shutdown_called = 0; - cqd->is_server_cq = 0; cqd->num_pluckers = 0; - cqd->num_polls = 0; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); -#ifndef NDEBUG - cqd->outstanding_tag_count = 0; -#endif - cq_event_queue_init(&cqd->queue); - GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, - grpc_schedule_on_exec_ctx); - - GPR_TIMER_END("grpc_completion_queue_create_internal", 0); +} - return cc; +static void cq_destroy_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); } -grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { - return cc->vtable->cq_completion_type; +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) { + return cq->vtable->cq_completion_type; } -int grpc_get_cq_poll_num(grpc_completion_queue *cc) { +int grpc_get_cq_poll_num(grpc_completion_queue *cq) { int cur_num_polls; - gpr_mu_lock(cc->data.mu); - cur_num_polls = cc->data.num_polls; - gpr_mu_unlock(cc->data.mu); + gpr_mu_lock(cq->mu); + cur_num_polls = cq->num_polls; + gpr_mu_unlock(cq->mu); return cur_num_polls; } #ifndef NDEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, +void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1, + "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1, reason); } #else -void grpc_cq_internal_ref(grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +void grpc_cq_internal_ref(grpc_completion_queue *cq) { #endif - gpr_ref(&cqd->owning_refs); + gpr_ref(&cq->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_completion_queue *cc = arg; - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy"); + grpc_completion_queue *cq = arg; + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); } #ifndef NDEBUG -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1, + "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1, reason); } #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; + grpc_completion_queue *cq) { #endif - if (gpr_unref(&cqd->owning_refs)) { - GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); - cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc)); - cq_event_queue_destroy(&cqd->queue); + if (gpr_unref(&cq->owning_refs)) { + cq->vtable->destroy(DATA_FROM_CQ(cq)); + cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); #ifndef NDEBUG - gpr_free(cqd->outstanding_tags); + gpr_free(cq->outstanding_tags); #endif - gpr_free(cc); + gpr_free(cq); } } -static void cq_begin_op(grpc_completion_queue *cc, void *tag) { - cq_data *cqd = &cc->data; -#ifndef NDEBUG - gpr_mu_lock(cqd->mu); +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); +} + +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { - cqd->outstanding_tag_capacity = - GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); - cqd->outstanding_tags = - gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * - cqd->outstanding_tag_capacity); - } - cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; - gpr_mu_unlock(cqd->mu); -#endif gpr_ref(&cqd->pending_events); } -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { - cc->vtable->begin_op(cc, tag); +void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + cq->vtable->begin_op(cq, tag); } #ifndef NDEBUG -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) { - cq_data *cqd = &cc->data; +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; if (lock_cq) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); } - for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { - if (cqd->outstanding_tags[i] == tag) { - cqd->outstanding_tag_count--; - GPR_SWAP(void *, cqd->outstanding_tags[i], - cqd->outstanding_tags[cqd->outstanding_tag_count]); + for (int i = 0; i < (int)cq->outstanding_tag_count; i++) { + if (cq->outstanding_tags[i] == tag) { + cq->outstanding_tag_count--; + GPR_SWAP(void *, cq->outstanding_tags[i], + cq->outstanding_tags[cq->outstanding_tag_count]); found = 1; break; } } if (lock_cq) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } GPR_ASSERT(found); } #else -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {} +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_NEXT) */ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -553,16 +588,16 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -570,28 +605,42 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = (uintptr_t)(is_success); - cq_check_tag(cc, tag, true); /* Used in debug builds only */ + cq_check_tag(cq, tag, true); /* Used in debug builds only */ /* Add the completion to the queue */ - cq_event_queue_push(&cqd->queue, storage); + bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - gpr_mu_lock(cqd->mu); - - int shutdown = gpr_unref(&cqd->pending_events); - if (!shutdown) { - grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); - gpr_mu_unlock(cqd->mu); - - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - - GRPC_ERROR_UNREF(kick_error); + bool will_definitely_shutdown = + gpr_atm_no_barrier_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } + } + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -599,16 +648,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_PLUCK) */ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -618,9 +668,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -632,8 +682,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); - gpr_mu_lock(cqd->mu); - cq_check_tag(cc, tag, false); /* Used in debug builds only */ + gpr_mu_lock(cq->mu); + cq_check_tag(cq, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); @@ -652,9 +702,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, } grpc_error *kick_error = - cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -663,8 +713,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + cq_finish_shutdown_pluck(exec_ctx, cq); + gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_pluck", 0); @@ -672,12 +722,12 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage); + cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage); } typedef struct { @@ -692,7 +742,7 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -703,7 +753,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { gpr_atm_no_barrier_load(&cqd->things_queued_ever); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but that + * might return NULL in some cases even if the queue is not empty; but + * that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ a->stolen_completion = cq_event_queue_pop(&cqd->queue); @@ -716,58 +767,56 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { } #ifndef NDEBUG -static void dump_pending_tags(grpc_completion_queue *cc) { +static void dump_pending_tags(grpc_completion_queue *cq) { if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; - cq_data *cqd = &cc->data; - gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cqd->mu); - for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { + gpr_mu_lock(cq->mu); + for (size_t i = 0; i < cq->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cq->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); gpr_free(out); } #else -static void dump_pending_tags(grpc_completion_queue *cc) {} +static void dump_pending_tags(grpc_completion_queue *cq) {} #endif -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GRPC_API_TRACE( "grpc_completion_queue_next(" - "cc=%p, " + "cq=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "next"); + GRPC_CQ_INTERNAL_REF(cq, "next"); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = NULL, @@ -800,21 +849,24 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, /* If c == NULL it means either the queue is empty OR in an transient inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second - attempt at popping. Not doing this can potentially deadlock this thread + attempt at popping. Not doing this can potentially deadlock this + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because - (cc->shutdown == true) is only possible when there is no pending work - (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + (cq->shutdown == true) is only possible when there is no pending + work + (i.e cq->pending_events == 0) and any outstanding + grpc_cq_completion events are already queued on this cq */ continue; } @@ -828,16 +880,16 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } /* The main polling work happens in grpc_pollset_work */ - gpr_mu_lock(cqd->mu); - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + gpr_mu_lock(cq->mu); + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), NULL, now, iteration_deadline); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); @@ -846,30 +898,74 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; } - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + if (cq_event_queue_num_items(&cqd->queue) > 0 && + gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { + gpr_mu_lock(cq->mu); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + } + GPR_TIMER_END("grpc_completion_queue_next", 0); return ret; } -grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); +} + +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; + } + cqd->shutdown_called = 1; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_next(exec_ctx, cq); + } + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); +} + +grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { - return cc->vtable->next(cc, deadline, reserved); + return cq->vtable->next(cq, deadline, reserved); } -static int add_plucker(grpc_completion_queue *cc, void *tag, +static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -879,9 +975,9 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, return 1; } -static void del_plucker(grpc_completion_queue *cc, void *tag, +static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -895,13 +991,13 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; @@ -913,51 +1009,51 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" - "cc=%p, tag=%p, " + "cq=%p, tag=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec, + 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); } GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "pluck"); + gpr_mu_lock(cq->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = tag, @@ -966,7 +1062,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -983,7 +1079,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -993,54 +1089,54 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (gpr_atm_no_barrier_load(&cqd->shutdown)) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!add_plucker(cc, tag, &worker)) { + if (!add_plucker(cq, tag, &worker)) { gpr_log(GPR_DEBUG, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), &worker, now, deadline); if (err != GRPC_ERROR_NONE) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; - del_plucker(cc, tag, &worker); + del_plucker(cq, tag, &worker); } done: - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -1049,85 +1145,66 @@ done: return ret; } -grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { - return cc->vtable->pluck(cc, tag, deadline, reserved); + return cq->vtable->pluck(cq, tag, deadline, reserved); } -/* Finishes the completion queue shutdown. This means that there are no more - completion events / tags expected from the completion queue - - Must be called under completion queue lock - - Must be called only once in completion queue's lifetime - - grpc_completion_queue_shutdown() MUST have been called before calling - this function */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); } -/* Shutdown simply drops a ref that we reserved at creation time; if we drop - to zero here, then enter shutdown mode and wake up any waiters */ -void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); - GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - cq_data *cqd = &cc->data; +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cqd->shutdown_called = 1; if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown(&exec_ctx, cc); + cq_finish_shutdown_pluck(exec_ctx, cq); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); +} + +/* Shutdown simply drops a ref that we reserved at creation time; if we drop + to zero here, then enter shutdown mode and wake up any waiters */ +void grpc_completion_queue_shutdown(grpc_completion_queue *cq) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); + GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); + cq->vtable->shutdown(&exec_ctx, cq); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } -void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); +void grpc_completion_queue_destroy(grpc_completion_queue *cq) { + GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); - grpc_completion_queue_shutdown(cc); - - /* TODO (sreek): This should not ideally be here. Refactor it into the - * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ - if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); - } + grpc_completion_queue_shutdown(cq); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } -grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; -} - -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { - return CQ_FROM_POLLSET(ps); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { - cc->data.is_server_cq = 1; -} - -bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { - return cc->data.is_server_cq; +grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) { + return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL; } -bool grpc_cq_can_listen(grpc_completion_queue *cc) { - return cc->poller_vtable->can_listen; +bool grpc_cq_can_listen(grpc_completion_queue *cq) { + return cq->poller_vtable->can_listen; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 97ea9cae20..af44482513 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -84,10 +84,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_server_cq(grpc_completion_queue *cc); bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 0cd436883a..de0a91e2b2 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -975,8 +975,6 @@ static void register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } - grpc_cq_mark_server_cq(cq); - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1156,9 +1154,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, chand->channel = channel; size_t cq_idx; - grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { - if (s->cqs[cq_idx] == accepting_cq) break; + if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break; } if (cq_idx == s->cq_count) { /* completion queue not found: pick a random one to publish new calls to */ diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index bd7d4ad691..a871ea895a 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -27,8 +27,8 @@ * immediately, unless flow control prevents it. * If it is throttled and keeps receiving values, as well as if it receives values before being * started, it will buffer them and propagate them in order as soon as its state becomes Started. - * If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and - * propagate the error immediately. + * If it receives an end of stream (via -writesFinishedWithError:), it will buffer the EOS after the + * last buffered value and issue it to the writeable after all buffered values are issued. * * Beware that a pipe of this type can't prevent receiving more values when it is paused (for * example if used to write data to a congested network connection). Because in such situations the diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index e4a7cc40f9..99cb0ad971 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -18,11 +18,13 @@ #import "GRXBufferedPipe.h" +@interface GRXBufferedPipe () +@property(atomic) id<GRXWriteable> writeable; +@end + @implementation GRXBufferedPipe { - id<GRXWriteable> _writeable; - NSMutableArray *_queue; - BOOL _inputIsFinished; NSError *_errorOrNil; + dispatch_queue_t _writeQueue; } @synthesize state = _state; @@ -33,99 +35,79 @@ - (instancetype)init { if (self = [super init]) { - _queue = [NSMutableArray array]; _state = GRXWriterStateNotStarted; + _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + dispatch_suspend(_writeQueue); } return self; } -- (id)popValue { - id value = _queue[0]; - [_queue removeObjectAtIndex:0]; - return value; -} - -- (void)writeBufferUntilPausedOrStopped { - while (_state == GRXWriterStateStarted && _queue.count > 0) { - [_writeable writeValue:[self popValue]]; - } - if (_inputIsFinished && _queue.count == 0) { - // Our writer finished normally while we were paused or not-started-yet. - [self finishWithError:_errorOrNil]; - } -} - #pragma mark GRXWriteable implementation -// Returns whether events can be simply propagated to the other end of the pipe. -- (BOOL)shouldFastForward { - return _state == GRXWriterStateStarted && _queue.count == 0; -} - - (void)writeValue:(id)value { - if (self.shouldFastForward) { - // Skip the queue. - [_writeable writeValue:value]; - } else { + if ([value respondsToSelector:@selector(copy)]) { // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. // So just buffer the new value. // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. - if ([value respondsToSelector:@selector(copy)]) { - value = [value copy]; - } - [_queue addObject:value]; + value = [value copy]; } + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^(void) { + [weakSelf.writeable writeValue:value]; + }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { - _inputIsFinished = YES; - _errorOrNil = errorOrNil; - if (errorOrNil || self.shouldFastForward) { - // No need to write pending values. - [self finishWithError:_errorOrNil]; - } + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + [weakSelf finishWithError:errorOrNil]; + }); } #pragma mark GRXWriter implementation - (void)setState:(GRXWriterState)newState { - // Manual transitions are only allowed from the started or paused states. - if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { - return; - } - - switch (newState) { - case GRXWriterStateFinished: - _state = newState; - _queue = nil; - // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the - // writeable to be messaged anymore. - _writeable = nil; - return; - case GRXWriterStatePaused: - _state = newState; + @synchronized (self) { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { return; - case GRXWriterStateStarted: - if (_state == GRXWriterStatePaused) { + } + + switch (newState) { + case GRXWriterStateFinished: + self.writeable = nil; + if (_state == GRXWriterStatePaused) { + dispatch_resume(_writeQueue); + } _state = newState; - [self writeBufferUntilPausedOrStopped]; - } - return; - case GRXWriterStateNotStarted: - return; + return; + case GRXWriterStatePaused: + if (_state == GRXWriterStateStarted) { + _state = newState; + dispatch_suspend(_writeQueue); + } + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + dispatch_resume(_writeQueue); + } + return; + case GRXWriterStateNotStarted: + return; + } } } - (void)startWithWriteable:(id<GRXWriteable>)writeable { + self.writeable = writeable; _state = GRXWriterStateStarted; - _writeable = writeable; - [self writeBufferUntilPausedOrStopped]; + dispatch_resume(_writeQueue); } - (void)finishWithError:(NSError *)errorOrNil { - id<GRXWriteable> writeable = _writeable; + [self.writeable writesFinishedWithError:errorOrNil]; self.state = GRXWriterStateFinished; - [writeable writesFinishedWithError:errorOrNil]; } @end diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index f152452b01..fa3ded4c0c 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -23,6 +23,8 @@ #import <RxLibrary/GRXWriteable.h> #import <RxLibrary/GRXWriter.h> +#define TEST_TIMEOUT 1 + // A mock of a GRXSingleValueHandler block that can be queried for how many times it was called and // what were the last values passed to it. // @@ -140,26 +142,38 @@ #pragma mark BufferedPipe - (void)testBufferedPipePropagatesValue { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; + id anyValue = @7; // If: GRXBufferedPipe *pipe = [GRXBufferedPipe pipe]; [pipe startWithWriteable:writeable]; [pipe writeValue:anyValue]; + [pipe writesFinishedWithError:nil]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); XCTAssertEqualObjects(handler.errorOrNil, nil); + } - (void)testBufferedPipePropagatesError { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil]; // If: @@ -168,15 +182,20 @@ [pipe writesFinishedWithError:anyError]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, nil); XCTAssertEqualObjects(handler.errorOrNil, anyError); } - (void)testBufferedPipeFinishWriteWhilePaused { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; id anyValue = @7; // If: @@ -188,6 +207,7 @@ [pipe startWithWriteable:writeable]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); XCTAssertEqualObjects(handler.errorOrNil, nil); diff --git a/templates/package.json.template b/templates/package.json.template index af13d528db..50893d3a54 100644 --- a/templates/package.json.template +++ b/templates/package.json.template @@ -58,7 +58,7 @@ }, "binary": { "module_name": "grpc_node", - "module_path": "src/node/extension_binary", + "module_path": "src/node/extension_binary/{node_abi}-{platform}-{arch}", "host": "https://storage.googleapis.com/", "remote_path": "grpc-precompiled-binaries/node/{name}/v{version}", "package_name": "{node_abi}-{platform}-{arch}.tar.gz" diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index c27337aaa8..f9d88d6327 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -93,7 +93,7 @@ static void test_pollset_conversion(void) { attr.cq_polling_type = polling_types[j]; cq = grpc_completion_queue_create( grpc_completion_queue_factory_lookup(&attr), &attr, NULL); - GPR_ASSERT(grpc_cq_from_pollset(grpc_cq_pollset(cq)) == cq); + GPR_ASSERT(grpc_cq_pollset(cq) != NULL); shutdown_and_destroy(cq); } } diff --git a/tools/internal_ci/linux/grpc_interop_tocloud.cfg b/tools/internal_ci/linux/grpc_interop_tocloud.cfg index 1f6421c83d..0c31b497af 100644 --- a/tools/internal_ci/linux/grpc_interop_tocloud.cfg +++ b/tools/internal_ci/linux/grpc_interop_tocloud.cfg @@ -20,6 +20,6 @@ build_file: "grpc/tools/internal_ci/linux/grpc_interop_tocloud.sh" timeout_mins: 480 action { define_artifacts { - regex: "**/report.xml" + regex: "**/sponge_log.xml" } } diff --git a/tools/internal_ci/linux/grpc_interop_tocloud.sh b/tools/internal_ci/linux/grpc_interop_tocloud.sh index fe5c9a5130..e3ba25af5d 100755 --- a/tools/internal_ci/linux/grpc_interop_tocloud.sh +++ b/tools/internal_ci/linux/grpc_interop_tocloud.sh @@ -23,4 +23,4 @@ cd $(dirname $0)/../../.. source tools/internal_ci/helper_scripts/prepare_build_linux_rc source tools/internal_ci/helper_scripts/prepare_build_interop_rc -tools/run_tests/run_interop_tests.py -l all -s all --use_docker --http2_interop -t -j 12 $@ +tools/run_tests/run_interop_tests.py -l all -s all --use_docker --http2_interop --internal_ci -t -j 12 $@ diff --git a/tools/internal_ci/linux/grpc_interop_toprod.cfg b/tools/internal_ci/linux/grpc_interop_toprod.cfg new file mode 100644 index 0000000000..18978b87e5 --- /dev/null +++ b/tools/internal_ci/linux/grpc_interop_toprod.cfg @@ -0,0 +1,26 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/linux/grpc_interop_toprod.sh" +# grpc_interop tests can take 6+ hours to complete. +timeout_mins: 60 +action { + define_artifacts { + regex: "**/sponge_log.xml" + } +} + diff --git a/tools/internal_ci/linux/grpc_interop_toprod.sh b/tools/internal_ci/linux/grpc_interop_toprod.sh new file mode 100644 index 0000000000..3d06185406 --- /dev/null +++ b/tools/internal_ci/linux/grpc_interop_toprod.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +export LANG=en_US.UTF-8 + +# Enter the gRPC repo root +cd $(dirname $0)/../../.. + +source tools/internal_ci/helper_scripts/prepare_build_linux_rc +source tools/internal_ci/helper_scripts/prepare_build_interop_rc + +tools/run_tests/run_interop_tests.py \ + -l all \ + --cloud_to_prod \ + --cloud_to_prod_auth \ + --prod_servers default gateway_v4 \ + --use_docker --internal_ci -t -j 12 $@ + diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py index 80062aa37d..1e702a8636 100755 --- a/tools/run_tests/run_interop_tests.py +++ b/tools/run_tests/run_interop_tests.py @@ -63,6 +63,13 @@ _TEST_TIMEOUT = 3*60 # see https://github.com/grpc/grpc/issues/9779 _SKIP_DATA_FRAME_PADDING = ['data_frame_padding'] +# report suffix is important for reports to get picked up by internal CI +_INTERNAL_CL_XML_REPORT = 'sponge_log.xml' + +# report suffix is important for reports to get picked up by internal CI +_XML_REPORT = 'report.xml' + + class CXXLanguage: def __init__(self): @@ -943,7 +950,12 @@ argp.add_argument('--insecure', action='store_const', const=True, help='Whether to use secure channel.') - +argp.add_argument('--internal_ci', + default=False, + action='store_const', + const=True, + help=('Put reports into subdirectories to improve ' + 'presentation of results by Internal CI.')) args = argp.parse_args() servers = set(s for s in itertools.chain.from_iterable(_SERVERS @@ -1201,7 +1213,10 @@ try: write_cmdlog_maybe(server_manual_cmd_log, 'interop_server_cmds.sh') write_cmdlog_maybe(client_manual_cmd_log, 'interop_client_cmds.sh') - report_utils.render_junit_xml_report(resultset, 'report.xml') + xml_report_name = _XML_REPORT + if args.internal_ci: + xml_report_name = _INTERNAL_CL_XML_REPORT + report_utils.render_junit_xml_report(resultset, xml_report_name) for name, job in resultset.items(): if "http2" in name: |