aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/iothread.cpp
diff options
context:
space:
mode:
authorGravatar Kurtis Rader <krader@skepticism.us>2016-05-01 21:01:00 -0700
committerGravatar Kurtis Rader <krader@skepticism.us>2016-05-01 21:54:54 -0700
commit483b7988632d864c65e4b68fc4a92133b172bbfe (patch)
tree495b440337b569ea8f0921fcfd9abb3ef55fb9da /src/iothread.cpp
parentb19bfc0dd3e9f5a0f8530ebccf207cef0824de7e (diff)
restyle iothread module to match project style
Reduces lint errors from 41 to 26 (-37%). Line count from 444 to 423 (-5%). Another step in resolving issue #2902.
Diffstat (limited to 'src/iothread.cpp')
-rw-r--r--src/iothread.cpp284
1 files changed, 136 insertions, 148 deletions
diff --git a/src/iothread.cpp b/src/iothread.cpp
index 3623850e..96a926a2 100644
--- a/src/iothread.cpp
+++ b/src/iothread.cpp
@@ -1,17 +1,17 @@
-#include <pthread.h>
#include <assert.h>
-#include <stdio.h>
+#include <fcntl.h>
#include <limits.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdio.h>
#include <sys/select.h>
#include <sys/types.h>
#include <unistd.h>
-#include <signal.h>
-#include <fcntl.h>
#include <queue>
-#include <stdbool.h>
-#include "iothread.h"
#include "common.h"
+#include "iothread.h"
#ifdef _POSIX_THREAD_THREADS_MAX
#if _POSIX_THREAD_THREADS_MAX < 64
@@ -23,7 +23,7 @@
#define IO_MAX_THREADS 64
#endif
-/* Values for the wakeup bytes sent to the ioport */
+// Values for the wakeup bytes sent to the ioport.
#define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99
#define IO_SERVICE_RESULT_QUEUE 100
@@ -32,23 +32,22 @@
static void iothread_service_main_thread_requests(void);
static void iothread_service_result_queue();
-struct SpawnRequest_t
-{
+struct SpawnRequest_t {
int (*handler)(void *);
void (*completionCallback)(void *, int);
void *context;
int handlerResult;
};
-struct MainThreadRequest_t
-{
+struct MainThreadRequest_t {
int (*handler)(void *);
void *context;
volatile int handlerResult;
volatile bool done;
};
-/* Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue, at which point they can be deallocated. s_active_thread_count is also protected by the lock. */
+// Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue,
+// at which point they can be deallocated. s_active_thread_count is also protected by the lock.
static pthread_mutex_t s_spawn_queue_lock;
static std::queue<SpawnRequest_t *> s_request_queue;
static int s_active_thread_count;
@@ -56,140 +55,134 @@ static int s_active_thread_count;
static pthread_mutex_t s_result_queue_lock;
static std::queue<SpawnRequest_t *> s_result_queue;
-/* "Do on main thread" support */
-static pthread_mutex_t s_main_thread_performer_lock; // protects the main thread requests
-static pthread_cond_t s_main_thread_performer_condition; //protects the main thread requests
-static pthread_mutex_t s_main_thread_request_queue_lock; // protects the queue
+// "Do on main thread" support.
+static pthread_mutex_t s_main_thread_performer_lock; // protects the main thread requests
+static pthread_cond_t s_main_thread_performer_condition; // protects the main thread requests
+static pthread_mutex_t s_main_thread_request_queue_lock; // protects the queue
static std::queue<MainThreadRequest_t *> s_main_thread_request_queue;
-/* Notifying pipes */
+// Notifying pipes.
static int s_read_pipe, s_write_pipe;
-static void iothread_init(void)
-{
+static void iothread_init(void) {
static bool inited = false;
- if (! inited)
- {
+ if (!inited) {
inited = true;
- /* Initialize some locks */
+ // Initialize some locks.
VOMIT_ON_FAILURE(pthread_mutex_init(&s_spawn_queue_lock, NULL));
VOMIT_ON_FAILURE(pthread_mutex_init(&s_result_queue_lock, NULL));
VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_request_queue_lock, NULL));
VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_performer_lock, NULL));
VOMIT_ON_FAILURE(pthread_cond_init(&s_main_thread_performer_condition, NULL));
- /* Initialize the completion pipes */
+ // Initialize the completion pipes.
int pipes[2] = {0, 0};
VOMIT_ON_FAILURE(pipe(pipes));
s_read_pipe = pipes[0];
s_write_pipe = pipes[1];
- // 0 means success to VOMIT_ON_FAILURE. Arrange to pass 0 if fcntl returns anything other than -1.
+ // 0 means success to VOMIT_ON_FAILURE. Arrange to pass 0 if fcntl returns anything other
+ // than -1.
VOMIT_ON_FAILURE(-1 == fcntl(s_read_pipe, F_SETFD, FD_CLOEXEC));
VOMIT_ON_FAILURE(-1 == fcntl(s_write_pipe, F_SETFD, FD_CLOEXEC));
}
}
-static void add_to_queue(struct SpawnRequest_t *req)
-{
+static void add_to_queue(struct SpawnRequest_t *req) {
ASSERT_IS_LOCKED(s_spawn_queue_lock);
s_request_queue.push(req);
}
-static SpawnRequest_t *dequeue_spawn_request(void)
-{
+static SpawnRequest_t *dequeue_spawn_request(void) {
ASSERT_IS_LOCKED(s_spawn_queue_lock);
SpawnRequest_t *result = NULL;
- if (! s_request_queue.empty())
- {
+ if (!s_request_queue.empty()) {
result = s_request_queue.front();
s_request_queue.pop();
}
return result;
}
-static void enqueue_thread_result(SpawnRequest_t *req)
-{
+static void enqueue_thread_result(SpawnRequest_t *req) {
scoped_lock lock(s_result_queue_lock);
s_result_queue.push(req);
}
-static void *this_thread()
-{
- return (void *)(intptr_t)pthread_self();
-}
+static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
-/* The function that does thread work. */
-static void *iothread_worker(void *unused)
-{
+/// The function that does thread work.
+static void *iothread_worker(void *unused) {
scoped_lock locker(s_spawn_queue_lock);
struct SpawnRequest_t *req;
- while ((req = dequeue_spawn_request()) != NULL)
- {
+ while ((req = dequeue_spawn_request()) != NULL) {
IOTHREAD_LOG fprintf(stderr, "pthread %p dequeued %p\n", this_thread(), req);
- /* Unlock the queue while we execute the request */
+ // Unlock the queue while we execute the request.
locker.unlock();
-
- /* Perfor the work */
+
+ // Perform the work.
req->handlerResult = req->handler(req->context);
-
- /* If there's a completion handler, we have to enqueue it on the result queue. Otherwise, we can just delete the request! */
- if (req->completionCallback == NULL)
- {
+
+ // If there's a completion handler, we have to enqueue it on the result queue. Otherwise, we
+ // can just delete the request!
+ if (req->completionCallback == NULL) {
delete req;
- }
- else
- {
- /* Enqueue the result, and tell the main thread about it */
+ } else {
+ // Enqueue the result, and tell the main thread about it.
enqueue_thread_result(req);
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
- VOMIT_ON_FAILURE(! write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
+ VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
}
-
- /* Lock us up again */
+
+ // Lock us up again.
locker.lock();
}
-
- /* We believe we have exhausted the thread request queue. We want to decrement s_active_thread_count and exit. But it's possible that a request just came in. Furthermore, it's possible that the main thread saw that s_active_thread_count is full, and decided to not spawn a new thread, trusting in one of the existing threads to handle it. But we've already committed to not handling anything else. Therefore, we have to decrement s_active_thread_count under the lock, which we still hold. Likewise, the main thread must check the value under the lock. */
+
+ // We believe we have exhausted the thread request queue. We want to decrement
+ // s_active_thread_count and exit. But it's possible that a request just came in. Furthermore,
+ // it's possible that the main thread saw that s_active_thread_count is full, and decided to not
+ // spawn a new thread, trusting in one of the existing threads to handle it. But we've already
+ // committed to not handling anything else. Therefore, we have to decrement
+ // s_active_thread_count under the lock, which we still hold. Likewise, the main thread must
+ // check the value under the lock.
ASSERT_IS_LOCKED(s_spawn_queue_lock);
assert(s_active_thread_count > 0);
s_active_thread_count -= 1;
-
- IOTHREAD_LOG fprintf(stderr, "pthread %p exiting\n", this_thread());
- /* We're done */
+ IOTHREAD_LOG fprintf(stderr, "pthread %p exiting\n", this_thread());
+ // We're done.
return NULL;
}
-/* Spawn another thread. No lock is held when this is called. */
-static void iothread_spawn()
-{
- /* The spawned thread inherits our signal mask. We don't want the thread to ever receive signals on the spawned thread, so temporarily block all signals, spawn the thread, and then restore it. */
+/// Spawn another thread. No lock is held when this is called.
+static void iothread_spawn() {
+ // The spawned thread inherits our signal mask. We don't want the thread to ever receive signals
+ // on the spawned thread, so temporarily block all signals, spawn the thread, and then restore
+ // it.
sigset_t new_set, saved_set;
sigfillset(&new_set);
VOMIT_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set));
- /* Spawn a thread. If this fails, it means there's already a bunch of threads; it is very unlikely that they are all on the verge of exiting, so one is likely to be ready to handle extant requests. So we can ignore failure with some confidence. */
+ // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
+ // unlikely that they are all on the verge of exiting, so one is likely to be ready to handle
+ // extant requests. So we can ignore failure with some confidence.
pthread_t thread = 0;
pthread_create(&thread, NULL, iothread_worker, NULL);
-
- /* We will never join this thread */
+
+ // We will never join this thread.
VOMIT_ON_FAILURE(pthread_detach(thread));
-
IOTHREAD_LOG fprintf(stderr, "pthread %p spawned\n", (void *)(intptr_t)thread);
-
- /* Restore our sigmask */
+ // Restore our sigmask.
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL));
}
-int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(void *, int), void *context)
-{
+int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(void *, int),
+ void *context) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
iothread_init();
- /* Create and initialize a request. */
+ // Create and initialize a request.
struct SpawnRequest_t *req = new SpawnRequest_t();
req->handler = handler;
req->completionCallback = completionCallback;
@@ -198,59 +191,56 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi
int local_thread_count = -1;
bool spawn_new_thread = false;
{
- /* Lock around a local region. Note that we can only access s_active_thread_count under the lock. */
+ // Lock around a local region. Note that we can only access s_active_thread_count under the
+ // lock.
scoped_lock lock(s_spawn_queue_lock);
add_to_queue(req);
- if (s_active_thread_count < IO_MAX_THREADS)
- {
+ if (s_active_thread_count < IO_MAX_THREADS) {
s_active_thread_count++;
spawn_new_thread = true;
}
local_thread_count = s_active_thread_count;
}
-
- /* Kick off the thread if we decided to do so */
- if (spawn_new_thread)
- {
+
+ // Kick off the thread if we decided to do so.
+ if (spawn_new_thread) {
iothread_spawn();
}
-
- /* We return the active thread count for informational purposes only */
+
+ // We return the active thread count for informational purposes only.
return local_thread_count;
}
-int iothread_port(void)
-{
+int iothread_port(void) {
iothread_init();
return s_read_pipe;
}
-void iothread_service_completion(void)
-{
+void iothread_service_completion(void) {
ASSERT_IS_MAIN_THREAD();
char wakeup_byte = 0;
VOMIT_ON_FAILURE(1 != read_loop(iothread_port(), &wakeup_byte, sizeof wakeup_byte));
- switch (wakeup_byte)
- {
- case IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE:
+ switch (wakeup_byte) {
+ case IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE: {
iothread_service_main_thread_requests();
break;
- case IO_SERVICE_RESULT_QUEUE:
+ }
+ case IO_SERVICE_RESULT_QUEUE: {
iothread_service_result_queue();
break;
- default:
+ }
+ default: {
fprintf(stderr, "Unknown wakeup byte %02x in %s\n", wakeup_byte, __FUNCTION__);
break;
+ }
}
}
-static bool iothread_wait_for_pending_completions(long timeout_usec)
-{
+static bool iothread_wait_for_pending_completions(long timeout_usec) {
const long usec_per_sec = 1000000;
struct timeval tv;
tv.tv_sec = timeout_usec / usec_per_sec;
tv.tv_usec = timeout_usec % usec_per_sec;
-
const int fd = iothread_port();
fd_set fds;
FD_ZERO(&fds);
@@ -259,29 +249,31 @@ static bool iothread_wait_for_pending_completions(long timeout_usec)
return ret > 0;
}
-/* Note that this function is quite sketchy. In particular, it drains threads, not requests, meaning that it may leave requests on the queue. This is the desired behavior (it may be called before fork, and we don't want to bother servicing requests before we fork), but in the test suite we depend on it draining all requests. In practice, this works, because a thread in practice won't exit while there is outstanding requests.
-
- At the moment, this function is only used in the test suite and in a drain-all-threads-before-fork compatibility mode that no architecture requires, so it's OK that it's terrible.
-*/
-void iothread_drain_all(void)
-{
+/// Note that this function is quite sketchy. In particular, it drains threads, not requests,
+/// meaning that it may leave requests on the queue. This is the desired behavior (it may be called
+/// before fork, and we don't want to bother servicing requests before we fork), but in the test
+/// suite we depend on it draining all requests. In practice, this works, because a thread in
+/// practice won't exit while there is outstanding requests.
+///
+/// At the moment, this function is only used in the test suite and in a
+/// drain-all-threads-before-fork compatibility mode that no architecture requires, so it's OK that
+/// it's terrible.
+void iothread_drain_all(void) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
-
+
scoped_lock locker(s_spawn_queue_lock);
-
+
#define TIME_DRAIN 0
#if TIME_DRAIN
int thread_count = s_active_thread_count;
double now = timef();
#endif
-
- /* Nasty polling via select(). */
- while (s_active_thread_count > 0)
- {
+
+ // Nasty polling via select().
+ while (s_active_thread_count > 0) {
locker.unlock();
- if (iothread_wait_for_pending_completions(1000))
- {
+ if (iothread_wait_for_pending_completions(1000)) {
iothread_service_completion();
}
locker.lock();
@@ -292,70 +284,64 @@ void iothread_drain_all(void)
#endif
}
-/* "Do on main thread" support */
-static void iothread_service_main_thread_requests(void)
-{
+/// "Do on main thread" support.
+static void iothread_service_main_thread_requests(void) {
ASSERT_IS_MAIN_THREAD();
- // Move the queue to a local variable
+ // Move the queue to a local variable.
std::queue<MainThreadRequest_t *> request_queue;
{
scoped_lock queue_lock(s_main_thread_request_queue_lock);
std::swap(request_queue, s_main_thread_request_queue);
}
- if (! request_queue.empty())
- {
- // Perform each of the functions
- // Note we are NOT responsible for deleting these. They are stack allocated in their respective threads!
- while (! request_queue.empty())
- {
+ if (!request_queue.empty()) {
+ // Perform each of the functions. Note we are NOT responsible for deleting these. They are
+ // stack allocated in their respective threads!
+ while (!request_queue.empty()) {
MainThreadRequest_t *req = request_queue.front();
request_queue.pop();
req->handlerResult = req->handler(req->context);
req->done = true;
}
- /* Ok, we've handled everybody. Announce the good news, and allow ourselves to be unlocked. Note we must do this while holding the lock. Otherwise we race with the waiting threads:
- 1. waiting thread checks for done, sees false
- 2. main thread performs request, sets done to true, posts to condition
- 3. waiting thread unlocks lock, waits on condition (forever)
- Because the waiting thread performs step 1 under the lock, if we take the lock, we avoid posting before the waiting thread is waiting.
- */
+ // Ok, we've handled everybody. Announce the good news, and allow ourselves to be unlocked.
+ // Note we must do this while holding the lock. Otherwise we race with the waiting threads:
+ //
+ // 1. waiting thread checks for done, sees false
+ // 2. main thread performs request, sets done to true, posts to condition
+ // 3. waiting thread unlocks lock, waits on condition (forever)
+ //
+ // Because the waiting thread performs step 1 under the lock, if we take the lock, we avoid
+ // posting before the waiting thread is waiting.
scoped_lock broadcast_lock(s_main_thread_performer_lock);
VOMIT_ON_FAILURE(pthread_cond_broadcast(&s_main_thread_performer_condition));
}
}
/* Service the queue of results */
-static void iothread_service_result_queue()
-{
- // Move the queue to a local variable
+static void iothread_service_result_queue() {
+ // Move the queue to a local variable.
std::queue<SpawnRequest_t *> result_queue;
{
scoped_lock queue_lock(s_result_queue_lock);
std::swap(result_queue, s_result_queue);
}
-
- // Perform each completion in order
- // We are responsibile for cleaning them up
- while (! result_queue.empty())
- {
+
+ // Perform each completion in order. We are responsibile for cleaning them up.
+ while (!result_queue.empty()) {
SpawnRequest_t *req = result_queue.front();
result_queue.pop();
- if (req->completionCallback)
- {
+ if (req->completionCallback) {
req->completionCallback(req->context, req->handlerResult);
}
delete req;
}
}
-int iothread_perform_on_main_base(int (*handler)(void *), void *context)
-{
- // If this is the main thread, just do it
- if (is_main_thread())
- {
+int iothread_perform_on_main_base(int (*handler)(void *), void *context) {
+ // If this is the main thread, just do it.
+ if (is_main_thread()) {
return handler(context);
}
@@ -366,25 +352,27 @@ int iothread_perform_on_main_base(int (*handler)(void *), void *context)
req.handlerResult = 0;
req.done = false;
- // Append it
+ // Append it. Do not delete the nested scope as it is crucial to the proper functioning of this
+ // code by virtue of the lock management.
{
scoped_lock queue_lock(s_main_thread_request_queue_lock);
s_main_thread_request_queue.push(&req);
}
- // Tell the pipe
+ // Tell the pipe.
const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;
- VOMIT_ON_FAILURE(! write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
+ VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
- // Wait on the condition, until we're done
+ // Wait on the condition, until we're done.
scoped_lock perform_lock(s_main_thread_performer_lock);
- while (! req.done)
- {
- // It would be nice to support checking for cancellation here, but the clients need a deterministic way to clean up to avoid leaks
- VOMIT_ON_FAILURE(pthread_cond_wait(&s_main_thread_performer_condition, &s_main_thread_performer_lock));
+ while (!req.done) {
+ // It would be nice to support checking for cancellation here, but the clients need a
+ // deterministic way to clean up to avoid leaks
+ VOMIT_ON_FAILURE(
+ pthread_cond_wait(&s_main_thread_performer_condition, &s_main_thread_performer_lock));
}
- // Ok, the request must now be done
+ // Ok, the request must now be done.
assert(req.done);
return req.handlerResult;
}