diff options
-rw-r--r-- | include/grpc/support/atm_gcc_atomic.h | 2 | ||||
-rw-r--r-- | include/grpc/support/atm_gcc_sync.h | 5 | ||||
-rw-r--r-- | include/grpc/support/atm_win32.h | 5 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 2 | ||||
-rw-r--r-- | src/core/support/stack_lockfree.c | 12 | ||||
-rw-r--r-- | src/core/surface/server.c | 5 |
8 files changed, 35 insertions, 3 deletions
diff --git a/include/grpc/support/atm_gcc_atomic.h b/include/grpc/support/atm_gcc_atomic.h index 65d3d0c60f..a2c8386028 100644 --- a/include/grpc/support/atm_gcc_atomic.h +++ b/include/grpc/support/atm_gcc_atomic.h @@ -46,6 +46,8 @@ typedef gpr_intptr gpr_atm; #define gpr_atm_no_barrier_load(p) (__atomic_load_n((p), __ATOMIC_RELAXED)) #define gpr_atm_rel_store(p, value) \ (__atomic_store_n((p), (gpr_intptr)(value), __ATOMIC_RELEASE)) +#define gpr_atm_no_barrier_store(p, value) \ + (__atomic_store_n((p), (gpr_intptr)(value), __ATOMIC_RELAXED)) #define gpr_atm_no_barrier_fetch_add(p, delta) \ (__atomic_fetch_add((p), (gpr_intptr)(delta), __ATOMIC_RELAXED)) diff --git a/include/grpc/support/atm_gcc_sync.h b/include/grpc/support/atm_gcc_sync.h index 4955e4436f..38b5a9eec2 100644 --- a/include/grpc/support/atm_gcc_sync.h +++ b/include/grpc/support/atm_gcc_sync.h @@ -68,6 +68,11 @@ static __inline void gpr_atm_rel_store(gpr_atm *p, gpr_atm value) { *p = value; } +static __inline void gpr_atm_no_barrier_store(gpr_atm *p, gpr_atm value) { + GPR_ATM_COMPILE_BARRIER_(); + *p = value; +} + #undef GPR_ATM_LS_BARRIER_ #undef GPR_ATM_COMPILE_BARRIER_ diff --git a/include/grpc/support/atm_win32.h b/include/grpc/support/atm_win32.h index da99021c24..694528a9ba 100644 --- a/include/grpc/support/atm_win32.h +++ b/include/grpc/support/atm_win32.h @@ -57,6 +57,11 @@ static __inline void gpr_atm_rel_store(gpr_atm *p, gpr_atm value) { *p = value; } +static __inline void gpr_atm_no_barrier_store(gpr_atm *p, gpr_atm value) { + /* TODO(ctiller): Can we implement something better here? */ + gpr_atm_rel_store(p, value); +} + static __inline int gpr_atm_no_barrier_cas(gpr_atm *p, gpr_atm o, gpr_atm n) { /* InterlockedCompareExchangePointerNoFence() not available on vista or windows7 */ diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 6ad377ce1c..a2df838d4a 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -102,6 +102,7 @@ static grpc_fd *alloc_fd(int fd) { r->freelist_next = NULL; r->read_watcher = r->write_watcher = NULL; r->on_done_closure = NULL; + r->closed = 0; return r; } @@ -209,6 +210,8 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); if (!has_watchers(fd)) { + GPR_ASSERT(!fd->closed); + fd->closed = 1; close(fd->fd); if (fd->on_done_closure) { grpc_iomgr_add_callback(fd->on_done_closure); @@ -426,7 +429,8 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { if (kick) { maybe_wake_one_watcher_locked(fd); } - if (grpc_fd_is_orphaned(fd) && !has_watchers(fd)) { + if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { + fd->closed = 1; close(fd->fd); if (fd->on_done_closure != NULL) { grpc_iomgr_add_callback(fd->on_done_closure); diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 94d0019fa4..4e8e267ffd 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -60,6 +60,7 @@ struct grpc_fd { gpr_mu set_state_mu; gpr_atm shutdown; + int closed; /* The watcher list. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 8538600112..6399aaadb9 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -142,6 +142,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { static void finish_shutdown(grpc_tcp_server *s) { s->shutdown_complete(s->shutdown_complete_arg); + s->shutdown_complete = NULL; gpr_mu_destroy(&s->mu); @@ -157,6 +158,7 @@ static void destroyed_port(void *server, int success) { gpr_mu_unlock(&s->mu); finish_shutdown(s); } else { + GPR_ASSERT(s->destroyed_ports < s->nports); gpr_mu_unlock(&s->mu); } } diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c index f24e272207..bc741f8c70 100644 --- a/src/core/support/stack_lockfree.c +++ b/src/core/support/stack_lockfree.c @@ -95,6 +95,8 @@ gpr_stack_lockfree *gpr_stack_lockfree_create(int entries) { memset(&stack->pushed, 0, sizeof(stack->pushed)); #endif + GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents)); + /* Point the head at reserved dummy entry */ stack->head.contents.index = INVALID_ENTRY_INDEX; return stack; @@ -108,11 +110,15 @@ void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { lockfree_node head; lockfree_node newhead; + lockfree_node curent; + lockfree_node newent; /* First fill in the entry's index and aba ctr for new head */ newhead.contents.index = (gpr_uint16)entry; /* Also post-increment the aba_ctr */ - newhead.contents.aba_ctr = stack->entries[entry].contents.aba_ctr++; + curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newhead.contents.aba_ctr = ++curent.contents.aba_ctr; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm); #ifndef NDEBUG /* Check for double push */ @@ -131,7 +137,9 @@ int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { /* Atomically get the existing head value for use */ head.atm = gpr_atm_no_barrier_load(&(stack->head.atm)); /* Point to it */ - stack->entries[entry].contents.index = head.contents.index; + newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newent.contents.index = head.contents.index; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm); } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); /* Use rel_cas above to make sure that entry index is set properly */ return head.contents.index == INVALID_ENTRY_INDEX; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index f19bcbd090..7031e63916 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -1271,6 +1271,8 @@ static void done_request_event(void *req, grpc_cq_completion *c) { } else { gpr_free(req); } + + server_unref(server); } static void fail_call(grpc_server *server, requested_call *rc) { @@ -1283,6 +1285,7 @@ static void fail_call(grpc_server *server, requested_call *rc) { rc->data.registered.initial_metadata->count = 0; break; } + server_ref(server); grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); } @@ -1293,6 +1296,8 @@ static void publish_registered_or_batch(grpc_call *call, int success, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); requested_call *rc = prc; call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + server_ref(chand->server); grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion); GRPC_CALL_INTERNAL_UNREF(call, "server", 0); |