diff options
Diffstat (limited to 'Database/Handle.hs')
-rw-r--r-- | Database/Handle.hs | 164 |
1 files changed, 58 insertions, 106 deletions
diff --git a/Database/Handle.hs b/Database/Handle.hs index 67f759265..a45fad22e 100644 --- a/Database/Handle.hs +++ b/Database/Handle.hs @@ -11,16 +11,14 @@ module Database.Handle ( DbHandle, initDb, openDb, + TableName, queryDb, closeDb, - Size, - queueDb, - flushQueueDb, commitDb, + commitDb', ) where import Utility.Exception -import Utility.Monad import Database.Persist.Sqlite import qualified Database.Sqlite as Sqlite @@ -33,18 +31,17 @@ import qualified Data.Text as T import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Logger (runNoLoggingT) import Data.List -import Data.Time.Clock import System.IO {- A DbHandle is a reference to a worker thread that communicates with - the database. It has a MVar which Jobs are submitted to. -} -data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue) +data DbHandle = DbHandle (Async ()) (MVar Job) {- Ensures that the database is initialized. Pass the migration action for - the database. - - - The database is put into WAL mode, to prevent readers from blocking - - writers, and prevent a writer from blocking readers. + - The database is initialized using WAL mode, to prevent readers + - from blocking writers, and prevent a writer from blocking readers. -} initDb :: FilePath -> SqlPersistM () -> IO () initDb f migration = do @@ -60,22 +57,71 @@ enableWAL db = do void $ Sqlite.finalize stmt Sqlite.close conn +{- Name of a table that should exist once the database is initialized. -} +type TableName = String + {- Opens the database, but does not perform any migrations. Only use - if the database is known to exist and have the right tables. -} openDb :: FilePath -> TableName -> IO DbHandle openDb db tablename = do jobs <- newEmptyMVar worker <- async (workerThread (T.pack db) tablename jobs) - q <- newMVar =<< emptyDbQueue - return $ DbHandle worker jobs q + return $ DbHandle worker jobs + +closeDb :: DbHandle -> IO () +closeDb (DbHandle worker jobs) = do + putMVar jobs CloseJob + wait worker + +{- Makes a query using the DbHandle. This should not be used to make + - changes to the database! + - + - Note that the action is not run by the calling thread, but by a + - worker thread. Exceptions are propigated to the calling thread. + - + - Only one action can be run at a time against a given DbHandle. + - If called concurrently in the same process, this will block until + - it is able to run. + -} +queryDb :: DbHandle -> SqlPersistM a -> IO a +queryDb (DbHandle _ jobs) a = do + res <- newEmptyMVar + putMVar jobs $ QueryJob $ + liftIO . putMVar res =<< tryNonAsync a + (either throwIO return =<< takeMVar res) + `catchNonAsync` (const $ error "sqlite query crashed") + +{- Writes a change to the database. + - + - If a database is opened multiple times and there's a concurrent writer, + - the write could fail. Retries repeatedly for up to 10 seconds, + - which should avoid all but the most exceptional problems. + -} +commitDb :: DbHandle -> SqlPersistM () -> IO () +commitDb h wa = robustly Nothing 100 (commitDb' h wa) + where + robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO () + robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e + robustly _ n a = do + r <- a + case r of + Right _ -> return () + Left e -> do + threadDelay 100000 -- 1/10th second + robustly (Just e) (n-1) a + +commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ()) +commitDb' (DbHandle _ jobs) a = do + res <- newEmptyMVar + putMVar jobs $ ChangeJob $ \runner -> + liftIO $ putMVar res =<< tryNonAsync (runner a) + takeMVar res data Job = QueryJob (SqlPersistM ()) | ChangeJob ((SqlPersistM () -> IO ()) -> IO ()) | CloseJob -type TableName = String - workerThread :: T.Text -> TableName -> MVar Job -> IO () workerThread db tablename jobs = catchNonAsync (run loop) showerr where @@ -121,97 +167,3 @@ workerThread db tablename jobs = catchNonAsync (run loop) showerr -- This should succeed for any table. nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1" - -{- Makes a query using the DbHandle. This should not be used to make - - changes to the database! - - - - Note that the action is not run by the calling thread, but by a - - worker thread. Exceptions are propigated to the calling thread. - - - - Only one action can be run at a time against a given DbHandle. - - If called concurrently in the same process, this will block until - - it is able to run. - -} -queryDb :: DbHandle -> SqlPersistM a -> IO a -queryDb (DbHandle _ jobs _) a = do - res <- newEmptyMVar - putMVar jobs $ QueryJob $ - liftIO . putMVar res =<< tryNonAsync a - (either throwIO return =<< takeMVar res) - `catchNonAsync` (const $ error "sqlite query crashed") - -closeDb :: DbHandle -> IO () -closeDb h@(DbHandle worker jobs _) = do - putMVar jobs CloseJob - wait worker - flushQueueDb h - -type Size = Int - -type LastCommitTime = UTCTime - -{- A queue of actions to perform, with a count of the number of actions - - queued, and a last commit time. -} -data DbQueue = DbQueue Size LastCommitTime (SqlPersistM ()) - -emptyDbQueue :: IO DbQueue -emptyDbQueue = do - now <- getCurrentTime - return $ DbQueue 0 now (return ()) - -{- Queues a change to be made to the database. It will be buffered - - to be committed later, unless the commitchecker action returns true. - - - - (Be sure to call closeDb or flushQueueDb to ensure the change - - gets committed.) - - - - Transactions built up by queueDb are sent to sqlite all at once. - - If sqlite fails due to another change being made concurrently by another - - process, the transaction is put back in the queue. This solves - - the sqlite multiple writer problem. - -} -queueDb - :: DbHandle - -> (Size -> LastCommitTime -> IO Bool) - -> SqlPersistM () - -> IO () -queueDb h@(DbHandle _ _ qvar) commitchecker a = do - DbQueue sz lastcommittime qa <- takeMVar qvar - let !sz' = sz + 1 - let qa' = qa >> a - let enqueue = putMVar qvar - ifM (commitchecker sz' lastcommittime) - ( do - r <- commitDb h qa' - case r of - Left _ -> enqueue $ DbQueue sz' lastcommittime qa' - Right _ -> do - now <- getCurrentTime - enqueue $ DbQueue 0 now (return ()) - , enqueue $ DbQueue sz' lastcommittime qa' - ) - -{- If flushing the queue fails, this could be because there is another - - writer to the database. Retry repeatedly for up to 10 seconds. -} -flushQueueDb :: DbHandle -> IO () -flushQueueDb h@(DbHandle _ _ qvar) = do - DbQueue sz _ qa <- takeMVar qvar - when (sz > 0) $ - robustly Nothing 100 (commitDb h qa) - where - robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO () - robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e - robustly _ n a = do - r <- a - case r of - Right _ -> return () - Left e -> do - threadDelay 100000 -- 1/10th second - robustly (Just e) (n-1) a - -commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ()) -commitDb (DbHandle _ jobs _) a = do - res <- newEmptyMVar - putMVar jobs $ ChangeJob $ \runner -> - liftIO $ putMVar res =<< tryNonAsync (runner a) - takeMVar res |