diff options
Diffstat (limited to 'Database/Handle.hs')
-rw-r--r-- | Database/Handle.hs | 103 |
1 files changed, 74 insertions, 29 deletions
diff --git a/Database/Handle.hs b/Database/Handle.hs index 691902a80..4397a0dea 100644 --- a/Database/Handle.hs +++ b/Database/Handle.hs @@ -5,32 +5,33 @@ - Licensed under the GNU GPL version 3 or higher. -} +{-# LANGUAGE BangPatterns #-} + module Database.Handle ( DbHandle, openDb, runDb, - CommitPolicy(..), - runDb', commitDb, closeDb, + Size, + queueDb, + flushQueueDb, ) where import Utility.Exception import Messages -import Database.Persist.Sqlite (runSqlite) -import Database.Esqueleto hiding (Key) +import Database.Persist.Sqlite import Control.Monad import Control.Monad.IO.Class (liftIO) import Control.Concurrent import Control.Concurrent.Async import Control.Exception (throwIO) import qualified Data.Text as T -import Data.Time.Clock {- A DbHandle is a reference to a worker thread that communicates with - the database. It has a MVar which Jobs are submitted to. -} -data DbHandle = DbHandle (Async ()) (MVar Job) (MVar UTCTime) +data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue) data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob @@ -38,8 +39,8 @@ openDb :: FilePath -> IO DbHandle openDb db = do jobs <- newEmptyMVar worker <- async (workerThread (T.pack db) jobs) - t <- newMVar =<< getCurrentTime - return $ DbHandle worker jobs t + q <- newMVar emptyDbQueue + return $ DbHandle worker jobs q workerThread :: T.Text -> MVar Job -> IO () workerThread db jobs = catchNonAsync go showerr @@ -57,34 +58,27 @@ workerThread db jobs = catchNonAsync go showerr CommitJob -> return CommitJob CloseJob -> return CloseJob - -{- Runs an action using the DbHandle. +{- Runs an action using the DbHandle. The action may be a query, or it may + - make a change. Changes are bundled up in a transaction, which does not + - complete until commitDb is called. - - Note that the action is not run by the calling thread, but by a - worker thread. Exceptions are propigated to the calling thread. - - Only one action can be run at a time against a given DbHandle. - - If called concurrently, this will block until it is able to run. + - If called concurrently in the same process, this will block until + - it is able to run. + - + - Note that if multiple processes are trying to change the database + - at the same time, sqlite will only let one build a transaction at a + - time. -} runDb :: DbHandle -> SqlPersistM a -> IO a -runDb h = runDb' h CommitManually - -data CommitPolicy = CommitManually | CommitAfter NominalDiffTime - -runDb' :: DbHandle -> CommitPolicy -> SqlPersistM a -> IO a -runDb' h@(DbHandle _ jobs t) pol a = do +runDb (DbHandle _ jobs _) a = do res <- newEmptyMVar - putMVar jobs $ RunJob $ liftIO . putMVar res =<< tryNonAsync a - r <- either throwIO return =<< takeMVar res - case pol of - CommitManually -> return () - CommitAfter n -> do - now <- getCurrentTime - prev <- takeMVar t - putMVar t now - when (diffUTCTime now prev > n) $ - commitDb h - return r + putMVar jobs $ RunJob $ + liftIO . putMVar res =<< tryNonAsync a + either throwIO return =<< takeMVar res {- Commits any transaction that was created by the previous calls to runDb, - and starts a new transaction. -} @@ -92,6 +86,57 @@ commitDb :: DbHandle -> IO () commitDb (DbHandle _ jobs _) = putMVar jobs CommitJob closeDb :: DbHandle -> IO () -closeDb (DbHandle worker jobs _) = do +closeDb h@(DbHandle worker jobs _) = do + flushQueueDb h putMVar jobs CloseJob wait worker + +type Size = Int + +{- A queue of actions to perform, with a count of the number of actions + - queued. -} +data DbQueue = DbQueue Size (SqlPersistM ()) + +emptyDbQueue :: DbQueue +emptyDbQueue = DbQueue 0 (return ()) + +{- Queues a change to be committed to the database. It will be buffered + - to be committed later, unless the queue gets larger than the specified + - size. + - + - (Be sure to call closeDb or flushQueue to ensure the change gets committed.) + - + - Transactions built up by queueDb are sent to sqlite all at once. + - If sqlite fails due to another change being made concurrently by another + - process, the transaction is put back in the queue. This solves + - the sqlite multiple writer problem. + -} +queueDb :: DbHandle -> Size -> SqlPersistM () -> IO () +queueDb h@(DbHandle _ _ qvar) maxsz a = do + DbQueue sz qa <- takeMVar qvar + let !sz' = sz + 1 + let qa' = qa >> a + let enqueue = putMVar qvar (DbQueue sz' qa') + if sz' > maxsz + then do + r <- tryNonAsync $ do + runDb h qa' + commitDb h + case r of + Left _ -> enqueue + Right _ -> putMVar qvar emptyDbQueue + else enqueue + +{- If flushing the queue fails, this could be because there is another + - writer to the database. Retry repeatedly for up to 10 seconds. -} +flushQueueDb :: DbHandle -> IO () +flushQueueDb h@(DbHandle _ _ qvar) = do + DbQueue sz qa <- takeMVar qvar + when (sz > 0) $ + robustly 100 $ runDb h qa + where + robustly :: Int -> IO () -> IO () + robustly 0 _ = error "failed to commit changes to sqlite database" + robustly n a = catchNonAsync a $ \_ -> do + threadDelay 100000 -- 1/10th second + robustly (n-1) a |