diff options
author | ridiculousfish <corydoras@ridiculousfish.com> | 2012-02-27 19:46:15 -0800 |
---|---|---|
committer | ridiculousfish <corydoras@ridiculousfish.com> | 2012-02-27 19:46:15 -0800 |
commit | cf54ad8242056ae6d686fa6455b24427f4411e18 (patch) | |
tree | a1cc4a8bec1d1688114a19b07a1b29e6b7f9b361 /iothread.cpp | |
parent | fdfa5c06028d3473f57cea497a6682a8e550352f (diff) |
Change to wait for all outstanding iothreads before calling fork(). This should prevent a whole host of threading/fork interactions, but may also compromise performance...we'll see.
Diffstat (limited to 'iothread.cpp')
-rw-r--r-- | iothread.cpp | 71 |
1 files changed, 36 insertions, 35 deletions
diff --git a/iothread.cpp b/iothread.cpp index b98dd8f2..3aed2aad 100644 --- a/iothread.cpp +++ b/iothread.cpp @@ -9,6 +9,7 @@ #include <stdlib.h> #include <unistd.h> #include <signal.h> +#include <queue> #define VOMIT_ON_FAILURE(a) do { if (0 != (a)) { int err = errno; fprintf(stderr, "%s failed on line %d in file %s: %d (%s)\n", #a, __LINE__, __FILE__, err, strerror(err)); abort(); }} while (0) @@ -33,7 +34,6 @@ static struct WorkerThread_t { } threads[IO_MAX_THREADS]; struct ThreadedRequest_t { - struct ThreadedRequest_t *next; int sequenceNumber; int (*handler)(void *); @@ -50,14 +50,14 @@ static struct WorkerThread_t *next_vacant_thread_slot(void) { } static pthread_mutex_t s_request_queue_lock; -static struct ThreadedRequest_t *s_request_queue_head; +static std::queue<ThreadedRequest_t *> s_request_queue; static int s_last_sequence_number; static int s_read_pipe, s_write_pipe; static void iothread_init(void) { - static int inited = 0; + static bool inited = false; if (! inited) { - inited = 1; + inited = true; /* Initialize the queue lock */ VOMIT_ON_FAILURE(pthread_mutex_init(&s_request_queue_lock, NULL)); @@ -76,16 +76,18 @@ static void iothread_init(void) { } static void add_to_queue(struct ThreadedRequest_t *req) { - //requires that the queue lock be held - if (s_request_queue_head == NULL) { - s_request_queue_head = req; - } else { - struct ThreadedRequest_t *last_in_queue = s_request_queue_head; - while (last_in_queue->next != NULL) { - last_in_queue = last_in_queue->next; - } - last_in_queue->next = req; - } + ASSERT_IS_LOCKED(s_request_queue_lock); + s_request_queue.push(req); +} + +static ThreadedRequest_t *dequeue_request(void) { + ThreadedRequest_t *result = NULL; + scoped_lock lock(s_request_queue_lock); + if (! s_request_queue.empty()) { + result = s_request_queue.front(); + s_request_queue.pop(); + } + return result; } /* The function that does thread work. */ @@ -99,13 +101,7 @@ static void *iothread_worker(void *threadPtr) { VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &set, NULL)); /* Grab a request off of the queue */ - struct ThreadedRequest_t *req; - VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock)); - req = s_request_queue_head; - if (req) { - s_request_queue_head = req->next; - } - VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock)); + struct ThreadedRequest_t *req = dequeue_request(); /* Run the handler and store the result */ if (req) { @@ -121,7 +117,8 @@ static void *iothread_worker(void *threadPtr) { /* Spawn another thread if there's work to be done. */ static void iothread_spawn_if_needed(void) { - if (s_request_queue_head != NULL && s_active_thread_count < IO_MAX_THREADS) { + ASSERT_IS_LOCKED(s_request_queue_lock); + if (! s_request_queue.empty() && s_active_thread_count < IO_MAX_THREADS) { struct WorkerThread_t *thread = next_vacant_thread_slot(); assert(thread != NULL); @@ -147,24 +144,19 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi /* Create and initialize a request. */ struct ThreadedRequest_t *req = new ThreadedRequest_t(); - req->next = NULL; req->handler = handler; req->completionCallback = completionCallback; req->context = context; req->sequenceNumber = ++s_last_sequence_number; - - /* Take the queue lock */ - VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock)); - - /* Add to the queue */ - add_to_queue(req); - - /* Spawn a thread if necessary */ - iothread_spawn_if_needed(); - - /* Unlock */ - VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock)); + + /* Take our lock */ + scoped_lock lock(s_request_queue_lock); + + /* Add to the queue */ + add_to_queue(req); + /* Spawn a thread if necessary */ + iothread_spawn_if_needed(); return 0; } @@ -174,6 +166,7 @@ int iothread_port(void) { } void iothread_service_completion(void) { + ASSERT_IS_MAIN_THREAD(); ThreadIndex_t threadIdx = (ThreadIndex_t)-1; VOMIT_ON_FAILURE(! read(iothread_port(), &threadIdx, sizeof threadIdx)); assert(threadIdx < IO_MAX_THREADS); @@ -201,3 +194,11 @@ void iothread_service_completion(void) { iothread_spawn_if_needed(); VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock)); } + +void iothread_drain_all(void) { + ASSERT_IS_MAIN_THREAD(); + ASSERT_IS_NOT_FORKED_CHILD(); + while (s_active_thread_count > 0) { + iothread_service_completion(); + } +} |