aboutsummaryrefslogtreecommitdiffhomepage
path: root/iothread.cpp
diff options
context:
space:
mode:
authorGravatar ridiculousfish <corydoras@ridiculousfish.com>2012-02-27 19:46:15 -0800
committerGravatar ridiculousfish <corydoras@ridiculousfish.com>2012-02-27 19:46:15 -0800
commitcf54ad8242056ae6d686fa6455b24427f4411e18 (patch)
treea1cc4a8bec1d1688114a19b07a1b29e6b7f9b361 /iothread.cpp
parentfdfa5c06028d3473f57cea497a6682a8e550352f (diff)
Change to wait for all outstanding iothreads before calling fork(). This should prevent a whole host of threading/fork interactions, but may also compromise performance...we'll see.
Diffstat (limited to 'iothread.cpp')
-rw-r--r--iothread.cpp71
1 files changed, 36 insertions, 35 deletions
diff --git a/iothread.cpp b/iothread.cpp
index b98dd8f2..3aed2aad 100644
--- a/iothread.cpp
+++ b/iothread.cpp
@@ -9,6 +9,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
+#include <queue>
#define VOMIT_ON_FAILURE(a) do { if (0 != (a)) { int err = errno; fprintf(stderr, "%s failed on line %d in file %s: %d (%s)\n", #a, __LINE__, __FILE__, err, strerror(err)); abort(); }} while (0)
@@ -33,7 +34,6 @@ static struct WorkerThread_t {
} threads[IO_MAX_THREADS];
struct ThreadedRequest_t {
- struct ThreadedRequest_t *next;
int sequenceNumber;
int (*handler)(void *);
@@ -50,14 +50,14 @@ static struct WorkerThread_t *next_vacant_thread_slot(void) {
}
static pthread_mutex_t s_request_queue_lock;
-static struct ThreadedRequest_t *s_request_queue_head;
+static std::queue<ThreadedRequest_t *> s_request_queue;
static int s_last_sequence_number;
static int s_read_pipe, s_write_pipe;
static void iothread_init(void) {
- static int inited = 0;
+ static bool inited = false;
if (! inited) {
- inited = 1;
+ inited = true;
/* Initialize the queue lock */
VOMIT_ON_FAILURE(pthread_mutex_init(&s_request_queue_lock, NULL));
@@ -76,16 +76,18 @@ static void iothread_init(void) {
}
static void add_to_queue(struct ThreadedRequest_t *req) {
- //requires that the queue lock be held
- if (s_request_queue_head == NULL) {
- s_request_queue_head = req;
- } else {
- struct ThreadedRequest_t *last_in_queue = s_request_queue_head;
- while (last_in_queue->next != NULL) {
- last_in_queue = last_in_queue->next;
- }
- last_in_queue->next = req;
- }
+ ASSERT_IS_LOCKED(s_request_queue_lock);
+ s_request_queue.push(req);
+}
+
+static ThreadedRequest_t *dequeue_request(void) {
+ ThreadedRequest_t *result = NULL;
+ scoped_lock lock(s_request_queue_lock);
+ if (! s_request_queue.empty()) {
+ result = s_request_queue.front();
+ s_request_queue.pop();
+ }
+ return result;
}
/* The function that does thread work. */
@@ -99,13 +101,7 @@ static void *iothread_worker(void *threadPtr) {
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &set, NULL));
/* Grab a request off of the queue */
- struct ThreadedRequest_t *req;
- VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
- req = s_request_queue_head;
- if (req) {
- s_request_queue_head = req->next;
- }
- VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
+ struct ThreadedRequest_t *req = dequeue_request();
/* Run the handler and store the result */
if (req) {
@@ -121,7 +117,8 @@ static void *iothread_worker(void *threadPtr) {
/* Spawn another thread if there's work to be done. */
static void iothread_spawn_if_needed(void) {
- if (s_request_queue_head != NULL && s_active_thread_count < IO_MAX_THREADS) {
+ ASSERT_IS_LOCKED(s_request_queue_lock);
+ if (! s_request_queue.empty() && s_active_thread_count < IO_MAX_THREADS) {
struct WorkerThread_t *thread = next_vacant_thread_slot();
assert(thread != NULL);
@@ -147,24 +144,19 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi
/* Create and initialize a request. */
struct ThreadedRequest_t *req = new ThreadedRequest_t();
- req->next = NULL;
req->handler = handler;
req->completionCallback = completionCallback;
req->context = context;
req->sequenceNumber = ++s_last_sequence_number;
-
- /* Take the queue lock */
- VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
-
- /* Add to the queue */
- add_to_queue(req);
-
- /* Spawn a thread if necessary */
- iothread_spawn_if_needed();
-
- /* Unlock */
- VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
+
+ /* Take our lock */
+ scoped_lock lock(s_request_queue_lock);
+
+ /* Add to the queue */
+ add_to_queue(req);
+ /* Spawn a thread if necessary */
+ iothread_spawn_if_needed();
return 0;
}
@@ -174,6 +166,7 @@ int iothread_port(void) {
}
void iothread_service_completion(void) {
+ ASSERT_IS_MAIN_THREAD();
ThreadIndex_t threadIdx = (ThreadIndex_t)-1;
VOMIT_ON_FAILURE(! read(iothread_port(), &threadIdx, sizeof threadIdx));
assert(threadIdx < IO_MAX_THREADS);
@@ -201,3 +194,11 @@ void iothread_service_completion(void) {
iothread_spawn_if_needed();
VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
}
+
+void iothread_drain_all(void) {
+ ASSERT_IS_MAIN_THREAD();
+ ASSERT_IS_NOT_FORKED_CHILD();
+ while (s_active_thread_count > 0) {
+ iothread_service_completion();
+ }
+}