aboutsummaryrefslogtreecommitdiffhomepage
path: root/iothread.cpp
diff options
context:
space:
mode:
authorGravatar ridiculousfish <corydoras@ridiculousfish.com>2011-12-26 21:21:12 -0800
committerGravatar ridiculousfish <corydoras@ridiculousfish.com>2011-12-26 21:21:12 -0800
commit74a1d70b8a713db1c56b0e521e7e62dbda7e7a37 (patch)
tree7f3b866ddf14ea312b09d7668641e19265ce8699 /iothread.cpp
parent165a5aaa83d40079cb4fcc1e76fda4b1ed1f993e (diff)
Added iothread
Diffstat (limited to 'iothread.cpp')
-rw-r--r--iothread.cpp191
1 files changed, 191 insertions, 0 deletions
diff --git a/iothread.cpp b/iothread.cpp
new file mode 100644
index 00000000..3f83c9e5
--- /dev/null
+++ b/iothread.cpp
@@ -0,0 +1,191 @@
+#include "iothread.h"
+#include <pthread.h>
+#include <assert.h>
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+
+#define VOMIT_ON_FAILURE(a) do { if (0 != (a)) { int err = errno; fprintf(stderr, "%s failed on line %d in file %s: %d (%s)\n", #a, __LINE__, __FILE__, err, strerror(err)); abort(); }} while (0)
+
+#ifdef _POSIX_THREAD_THREADS_MAX
+ #if _POSIX_THREAD_THREADS_MAX < 64
+ #define IO_MAX_THREADS _POSIX_THREAD_THREADS_MAX
+ #endif
+#endif
+
+#ifndef IO_MAX_THREADS
+ #define IO_MAX_THREADS 64
+#endif
+
+static int s_active_thread_count;
+
+typedef unsigned char ThreadIndex_t;
+
+static struct WorkerThread_t {
+ ThreadIndex_t idx;
+ pthread_t thread;
+} threads[IO_MAX_THREADS];
+
+struct ThreadedRequest_t {
+ struct ThreadedRequest_t *next;
+ int sequenceNumber;
+
+ int (*handler)(void *);
+ void (*completionCallback)(void *, int);
+ void *context;
+ int handlerResult;
+};
+
+static struct WorkerThread_t *next_vacant_thread_slot(void) {
+ for (ThreadIndex_t i=0; i < IO_MAX_THREADS; i++) {
+ if (! threads[i].thread) return &threads[i];
+ }
+ return NULL;
+}
+
+static pthread_mutex_t s_request_queue_lock;
+static struct ThreadedRequest_t *s_request_queue_head;
+static int s_last_sequence_number;
+static int s_read_pipe, s_write_pipe;
+
+static void iothread_init(void) {
+ static int inited = 0;
+ if (! inited) {
+ inited = 1;
+
+ /* Initialize the queue lock */
+ VOMIT_ON_FAILURE(pthread_mutex_init(&s_request_queue_lock, NULL));
+
+ /* Initialize the completion pipes */
+ int pipes[2] = {0, 0};
+ VOMIT_ON_FAILURE(pipe(pipes));
+ s_read_pipe = pipes[0];
+ s_write_pipe = pipes[1];
+
+ /* Tell each thread its index */
+ for (ThreadIndex_t i=0; i < IO_MAX_THREADS; i++) {
+ threads[i].idx = i;
+ }
+ }
+}
+
+static void add_to_queue(struct ThreadedRequest_t *req) {
+ //requires that the queue lock be held
+ if (s_request_queue_head == NULL) {
+ s_request_queue_head = req;
+ } else {
+ struct ThreadedRequest_t *last_in_queue = s_request_queue_head;
+ while (last_in_queue->next != NULL) {
+ last_in_queue = last_in_queue->next;
+ }
+ last_in_queue->next = req;
+ }
+}
+
+/* The function that does thread work. */
+static void *iothread_worker(void *threadPtr) {
+ assert(threadPtr != NULL);
+ struct WorkerThread_t *thread = (struct WorkerThread_t *)threadPtr;
+
+ /* Grab a request off of the queue */
+ struct ThreadedRequest_t *req;
+ VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
+ req = s_request_queue_head;
+ if (req) {
+ s_request_queue_head = req->next;
+ }
+ VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
+
+ /* Run the handler and store the result */
+ if (req) {
+ req->handlerResult = req->handler(req->context);
+ }
+
+ /* Write our index to wake up the main thread */
+ VOMIT_ON_FAILURE(! write(s_write_pipe, &thread->idx, sizeof thread->idx));
+
+ /* We're done */
+ return req;
+}
+
+/* Spawn another thread if there's work to be done. */
+static void iothread_spawn_if_needed(void) {
+ if (s_request_queue_head != NULL && s_active_thread_count < IO_MAX_THREADS) {
+ struct WorkerThread_t *thread = next_vacant_thread_slot();
+ assert(thread != NULL);
+
+ /* Spawn a thread */
+ int err;
+ do {
+ err = 0;
+ if (pthread_create(&thread->thread, NULL, iothread_worker, thread)) {
+ err = errno;
+ }
+ } while (err == EAGAIN);
+ assert(err == 0);
+
+ /* Note that we are spawned another thread */
+ s_active_thread_count += 1;
+ }
+}
+
+int iothread_perform(int (*handler)(void *), void (*completionCallback)(void *, int), void *context) {
+ iothread_init();
+
+ /* Create and initialize a request. */
+ struct ThreadedRequest_t *req = (struct ThreadedRequest_t *)malloc(sizeof *req);
+ req->next = NULL;
+ req->handler = handler;
+ req->completionCallback = completionCallback;
+ req->context = context;
+ req->sequenceNumber = ++s_last_sequence_number;
+
+ /* Take the queue lock */
+ VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
+
+ /* Add to the queue */
+ add_to_queue(req);
+
+ /* Spawn a thread if necessary */
+ iothread_spawn_if_needed();
+
+ /* Unlock */
+ VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
+
+ return 0;
+}
+
+int iothread_port(void) {
+ iothread_init();
+ return s_read_pipe;
+}
+
+void iothread_service_completion(void) {
+ ThreadIndex_t threadIdx = (ThreadIndex_t)-1;
+ VOMIT_ON_FAILURE(! read(iothread_port(), &threadIdx, sizeof threadIdx));
+ assert(threadIdx < IO_MAX_THREADS);
+
+ struct WorkerThread_t *thread = &threads[threadIdx];
+ assert(thread->thread != 0);
+
+ struct ThreadedRequest_t *req = NULL;
+ VOMIT_ON_FAILURE(pthread_join(thread->thread, (void **)&req));
+
+ /* Free up this thread */
+ thread->thread = 0;
+ assert(s_active_thread_count > 0);
+ s_active_thread_count -= 1;
+
+ /* Handle the request */
+ if (req && req->completionCallback) {
+ req->completionCallback(req->context, req->handlerResult);
+ }
+
+ /* Maybe spawn another thread, if there's more work to be done. */
+ VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock));
+ iothread_spawn_if_needed();
+ VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock));
+}