aboutsummaryrefslogtreecommitdiff
path: root/Database/Handle.hs
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2015-12-23 14:59:58 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2015-12-23 14:59:58 -0400
commit1aee8106ac52b8337a12eb7b29bdf2ce2f3b4ca0 (patch)
treee913b730bab350207734a36d9dc2fd05d2393427 /Database/Handle.hs
parent8693130a22747fe90c6f950ee43d56ebc8a7c236 (diff)
split out Database.Queue from Database.Handle
Fsck can use the queue for efficiency since it is write-heavy, and only reads a value before writing it. But, the queue is not suited to the Keys database.
Diffstat (limited to 'Database/Handle.hs')
-rw-r--r--Database/Handle.hs164
1 files changed, 58 insertions, 106 deletions
diff --git a/Database/Handle.hs b/Database/Handle.hs
index 67f759265..a45fad22e 100644
--- a/Database/Handle.hs
+++ b/Database/Handle.hs
@@ -11,16 +11,14 @@ module Database.Handle (
DbHandle,
initDb,
openDb,
+ TableName,
queryDb,
closeDb,
- Size,
- queueDb,
- flushQueueDb,
commitDb,
+ commitDb',
) where
import Utility.Exception
-import Utility.Monad
import Database.Persist.Sqlite
import qualified Database.Sqlite as Sqlite
@@ -33,18 +31,17 @@ import qualified Data.Text as T
import Control.Monad.Trans.Resource (runResourceT)
import Control.Monad.Logger (runNoLoggingT)
import Data.List
-import Data.Time.Clock
import System.IO
{- A DbHandle is a reference to a worker thread that communicates with
- the database. It has a MVar which Jobs are submitted to. -}
-data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
+data DbHandle = DbHandle (Async ()) (MVar Job)
{- Ensures that the database is initialized. Pass the migration action for
- the database.
-
- - The database is put into WAL mode, to prevent readers from blocking
- - writers, and prevent a writer from blocking readers.
+ - The database is initialized using WAL mode, to prevent readers
+ - from blocking writers, and prevent a writer from blocking readers.
-}
initDb :: FilePath -> SqlPersistM () -> IO ()
initDb f migration = do
@@ -60,22 +57,71 @@ enableWAL db = do
void $ Sqlite.finalize stmt
Sqlite.close conn
+{- Name of a table that should exist once the database is initialized. -}
+type TableName = String
+
{- Opens the database, but does not perform any migrations. Only use
- if the database is known to exist and have the right tables. -}
openDb :: FilePath -> TableName -> IO DbHandle
openDb db tablename = do
jobs <- newEmptyMVar
worker <- async (workerThread (T.pack db) tablename jobs)
- q <- newMVar =<< emptyDbQueue
- return $ DbHandle worker jobs q
+ return $ DbHandle worker jobs
+
+closeDb :: DbHandle -> IO ()
+closeDb (DbHandle worker jobs) = do
+ putMVar jobs CloseJob
+ wait worker
+
+{- Makes a query using the DbHandle. This should not be used to make
+ - changes to the database!
+ -
+ - Note that the action is not run by the calling thread, but by a
+ - worker thread. Exceptions are propigated to the calling thread.
+ -
+ - Only one action can be run at a time against a given DbHandle.
+ - If called concurrently in the same process, this will block until
+ - it is able to run.
+ -}
+queryDb :: DbHandle -> SqlPersistM a -> IO a
+queryDb (DbHandle _ jobs) a = do
+ res <- newEmptyMVar
+ putMVar jobs $ QueryJob $
+ liftIO . putMVar res =<< tryNonAsync a
+ (either throwIO return =<< takeMVar res)
+ `catchNonAsync` (const $ error "sqlite query crashed")
+
+{- Writes a change to the database.
+ -
+ - If a database is opened multiple times and there's a concurrent writer,
+ - the write could fail. Retries repeatedly for up to 10 seconds,
+ - which should avoid all but the most exceptional problems.
+ -}
+commitDb :: DbHandle -> SqlPersistM () -> IO ()
+commitDb h wa = robustly Nothing 100 (commitDb' h wa)
+ where
+ robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
+ robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
+ robustly _ n a = do
+ r <- a
+ case r of
+ Right _ -> return ()
+ Left e -> do
+ threadDelay 100000 -- 1/10th second
+ robustly (Just e) (n-1) a
+
+commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
+commitDb' (DbHandle _ jobs) a = do
+ res <- newEmptyMVar
+ putMVar jobs $ ChangeJob $ \runner ->
+ liftIO $ putMVar res =<< tryNonAsync (runner a)
+ takeMVar res
data Job
= QueryJob (SqlPersistM ())
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
| CloseJob
-type TableName = String
-
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
workerThread db tablename jobs = catchNonAsync (run loop) showerr
where
@@ -121,97 +167,3 @@ workerThread db tablename jobs = catchNonAsync (run loop) showerr
-- This should succeed for any table.
nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"
-
-{- Makes a query using the DbHandle. This should not be used to make
- - changes to the database!
- -
- - Note that the action is not run by the calling thread, but by a
- - worker thread. Exceptions are propigated to the calling thread.
- -
- - Only one action can be run at a time against a given DbHandle.
- - If called concurrently in the same process, this will block until
- - it is able to run.
- -}
-queryDb :: DbHandle -> SqlPersistM a -> IO a
-queryDb (DbHandle _ jobs _) a = do
- res <- newEmptyMVar
- putMVar jobs $ QueryJob $
- liftIO . putMVar res =<< tryNonAsync a
- (either throwIO return =<< takeMVar res)
- `catchNonAsync` (const $ error "sqlite query crashed")
-
-closeDb :: DbHandle -> IO ()
-closeDb h@(DbHandle worker jobs _) = do
- putMVar jobs CloseJob
- wait worker
- flushQueueDb h
-
-type Size = Int
-
-type LastCommitTime = UTCTime
-
-{- A queue of actions to perform, with a count of the number of actions
- - queued, and a last commit time. -}
-data DbQueue = DbQueue Size LastCommitTime (SqlPersistM ())
-
-emptyDbQueue :: IO DbQueue
-emptyDbQueue = do
- now <- getCurrentTime
- return $ DbQueue 0 now (return ())
-
-{- Queues a change to be made to the database. It will be buffered
- - to be committed later, unless the commitchecker action returns true.
- -
- - (Be sure to call closeDb or flushQueueDb to ensure the change
- - gets committed.)
- -
- - Transactions built up by queueDb are sent to sqlite all at once.
- - If sqlite fails due to another change being made concurrently by another
- - process, the transaction is put back in the queue. This solves
- - the sqlite multiple writer problem.
- -}
-queueDb
- :: DbHandle
- -> (Size -> LastCommitTime -> IO Bool)
- -> SqlPersistM ()
- -> IO ()
-queueDb h@(DbHandle _ _ qvar) commitchecker a = do
- DbQueue sz lastcommittime qa <- takeMVar qvar
- let !sz' = sz + 1
- let qa' = qa >> a
- let enqueue = putMVar qvar
- ifM (commitchecker sz' lastcommittime)
- ( do
- r <- commitDb h qa'
- case r of
- Left _ -> enqueue $ DbQueue sz' lastcommittime qa'
- Right _ -> do
- now <- getCurrentTime
- enqueue $ DbQueue 0 now (return ())
- , enqueue $ DbQueue sz' lastcommittime qa'
- )
-
-{- If flushing the queue fails, this could be because there is another
- - writer to the database. Retry repeatedly for up to 10 seconds. -}
-flushQueueDb :: DbHandle -> IO ()
-flushQueueDb h@(DbHandle _ _ qvar) = do
- DbQueue sz _ qa <- takeMVar qvar
- when (sz > 0) $
- robustly Nothing 100 (commitDb h qa)
- where
- robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
- robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
- robustly _ n a = do
- r <- a
- case r of
- Right _ -> return ()
- Left e -> do
- threadDelay 100000 -- 1/10th second
- robustly (Just e) (n-1) a
-
-commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
-commitDb (DbHandle _ jobs _) a = do
- res <- newEmptyMVar
- putMVar jobs $ ChangeJob $ \runner ->
- liftIO $ putMVar res =<< tryNonAsync (runner a)
- takeMVar res