aboutsummaryrefslogtreecommitdiff
path: root/Database
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2015-12-23 14:59:58 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2015-12-23 14:59:58 -0400
commit1aee8106ac52b8337a12eb7b29bdf2ce2f3b4ca0 (patch)
treee913b730bab350207734a36d9dc2fd05d2393427 /Database
parent8693130a22747fe90c6f950ee43d56ebc8a7c236 (diff)
split out Database.Queue from Database.Handle
Fsck can use the queue for efficiency since it is write-heavy, and only reads a value before writing it. But, the queue is not suited to the Keys database.
Diffstat (limited to 'Database')
-rw-r--r--Database/Fsck.hs11
-rw-r--r--Database/Handle.hs164
-rw-r--r--Database/Keys.hs14
-rw-r--r--Database/Keys/Types.hs4
-rw-r--r--Database/Queue.hs104
5 files changed, 177 insertions, 120 deletions
diff --git a/Database/Fsck.hs b/Database/Fsck.hs
index b0e56f6c0..d176690a6 100644
--- a/Database/Fsck.hs
+++ b/Database/Fsck.hs
@@ -21,7 +21,7 @@ module Database.Fsck (
) where
import Database.Types
-import qualified Database.Handle as H
+import qualified Database.Queue as H
import Locations
import Utility.PosixFiles
import Utility.Exception
@@ -37,7 +37,7 @@ import Database.Persist.TH
import Database.Esqueleto hiding (Key)
import Data.Time.Clock
-data FsckHandle = FsckHandle H.DbHandle UUID
+data FsckHandle = FsckHandle H.DbQueue UUID
{- Each key stored in the database has already been fscked as part
- of the latest incremental fsck pass. -}
@@ -77,7 +77,7 @@ openDb u = do
void $ tryIO $ removeDirectoryRecursive dbdir
rename tmpdbdir dbdir
lockFileCached =<< fromRepo (gitAnnexFsckDbLock u)
- h <- liftIO $ H.openDb db "fscked"
+ h <- liftIO $ H.openDbQueue db "fscked"
-- work around https://github.com/yesodweb/persistent/issues/474
liftIO setConsoleEncoding
@@ -86,7 +86,7 @@ openDb u = do
closeDb :: FsckHandle -> Annex ()
closeDb (FsckHandle h u) = do
- liftIO $ H.closeDb h
+ liftIO $ H.closeDbQueue h
unlockFile =<< fromRepo (gitAnnexFsckDbLock u)
addDb :: FsckHandle -> Key -> IO ()
@@ -102,8 +102,9 @@ addDb (FsckHandle h _) k = H.queueDb h checkcommit $
now <- getCurrentTime
return $ diffUTCTime lastcommittime now > 300
+{- Doesn't know about keys that were just added with addDb. -}
inDb :: FsckHandle -> Key -> IO Bool
-inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey
+inDb (FsckHandle h _) = H.queryDbQueue h . inDb' . toSKey
inDb' :: SKey -> SqlPersistM Bool
inDb' sk = do
diff --git a/Database/Handle.hs b/Database/Handle.hs
index 67f759265..a45fad22e 100644
--- a/Database/Handle.hs
+++ b/Database/Handle.hs
@@ -11,16 +11,14 @@ module Database.Handle (
DbHandle,
initDb,
openDb,
+ TableName,
queryDb,
closeDb,
- Size,
- queueDb,
- flushQueueDb,
commitDb,
+ commitDb',
) where
import Utility.Exception
-import Utility.Monad
import Database.Persist.Sqlite
import qualified Database.Sqlite as Sqlite
@@ -33,18 +31,17 @@ import qualified Data.Text as T
import Control.Monad.Trans.Resource (runResourceT)
import Control.Monad.Logger (runNoLoggingT)
import Data.List
-import Data.Time.Clock
import System.IO
{- A DbHandle is a reference to a worker thread that communicates with
- the database. It has a MVar which Jobs are submitted to. -}
-data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
+data DbHandle = DbHandle (Async ()) (MVar Job)
{- Ensures that the database is initialized. Pass the migration action for
- the database.
-
- - The database is put into WAL mode, to prevent readers from blocking
- - writers, and prevent a writer from blocking readers.
+ - The database is initialized using WAL mode, to prevent readers
+ - from blocking writers, and prevent a writer from blocking readers.
-}
initDb :: FilePath -> SqlPersistM () -> IO ()
initDb f migration = do
@@ -60,22 +57,71 @@ enableWAL db = do
void $ Sqlite.finalize stmt
Sqlite.close conn
+{- Name of a table that should exist once the database is initialized. -}
+type TableName = String
+
{- Opens the database, but does not perform any migrations. Only use
- if the database is known to exist and have the right tables. -}
openDb :: FilePath -> TableName -> IO DbHandle
openDb db tablename = do
jobs <- newEmptyMVar
worker <- async (workerThread (T.pack db) tablename jobs)
- q <- newMVar =<< emptyDbQueue
- return $ DbHandle worker jobs q
+ return $ DbHandle worker jobs
+
+closeDb :: DbHandle -> IO ()
+closeDb (DbHandle worker jobs) = do
+ putMVar jobs CloseJob
+ wait worker
+
+{- Makes a query using the DbHandle. This should not be used to make
+ - changes to the database!
+ -
+ - Note that the action is not run by the calling thread, but by a
+ - worker thread. Exceptions are propigated to the calling thread.
+ -
+ - Only one action can be run at a time against a given DbHandle.
+ - If called concurrently in the same process, this will block until
+ - it is able to run.
+ -}
+queryDb :: DbHandle -> SqlPersistM a -> IO a
+queryDb (DbHandle _ jobs) a = do
+ res <- newEmptyMVar
+ putMVar jobs $ QueryJob $
+ liftIO . putMVar res =<< tryNonAsync a
+ (either throwIO return =<< takeMVar res)
+ `catchNonAsync` (const $ error "sqlite query crashed")
+
+{- Writes a change to the database.
+ -
+ - If a database is opened multiple times and there's a concurrent writer,
+ - the write could fail. Retries repeatedly for up to 10 seconds,
+ - which should avoid all but the most exceptional problems.
+ -}
+commitDb :: DbHandle -> SqlPersistM () -> IO ()
+commitDb h wa = robustly Nothing 100 (commitDb' h wa)
+ where
+ robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
+ robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
+ robustly _ n a = do
+ r <- a
+ case r of
+ Right _ -> return ()
+ Left e -> do
+ threadDelay 100000 -- 1/10th second
+ robustly (Just e) (n-1) a
+
+commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
+commitDb' (DbHandle _ jobs) a = do
+ res <- newEmptyMVar
+ putMVar jobs $ ChangeJob $ \runner ->
+ liftIO $ putMVar res =<< tryNonAsync (runner a)
+ takeMVar res
data Job
= QueryJob (SqlPersistM ())
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
| CloseJob
-type TableName = String
-
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
workerThread db tablename jobs = catchNonAsync (run loop) showerr
where
@@ -121,97 +167,3 @@ workerThread db tablename jobs = catchNonAsync (run loop) showerr
-- This should succeed for any table.
nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"
-
-{- Makes a query using the DbHandle. This should not be used to make
- - changes to the database!
- -
- - Note that the action is not run by the calling thread, but by a
- - worker thread. Exceptions are propigated to the calling thread.
- -
- - Only one action can be run at a time against a given DbHandle.
- - If called concurrently in the same process, this will block until
- - it is able to run.
- -}
-queryDb :: DbHandle -> SqlPersistM a -> IO a
-queryDb (DbHandle _ jobs _) a = do
- res <- newEmptyMVar
- putMVar jobs $ QueryJob $
- liftIO . putMVar res =<< tryNonAsync a
- (either throwIO return =<< takeMVar res)
- `catchNonAsync` (const $ error "sqlite query crashed")
-
-closeDb :: DbHandle -> IO ()
-closeDb h@(DbHandle worker jobs _) = do
- putMVar jobs CloseJob
- wait worker
- flushQueueDb h
-
-type Size = Int
-
-type LastCommitTime = UTCTime
-
-{- A queue of actions to perform, with a count of the number of actions
- - queued, and a last commit time. -}
-data DbQueue = DbQueue Size LastCommitTime (SqlPersistM ())
-
-emptyDbQueue :: IO DbQueue
-emptyDbQueue = do
- now <- getCurrentTime
- return $ DbQueue 0 now (return ())
-
-{- Queues a change to be made to the database. It will be buffered
- - to be committed later, unless the commitchecker action returns true.
- -
- - (Be sure to call closeDb or flushQueueDb to ensure the change
- - gets committed.)
- -
- - Transactions built up by queueDb are sent to sqlite all at once.
- - If sqlite fails due to another change being made concurrently by another
- - process, the transaction is put back in the queue. This solves
- - the sqlite multiple writer problem.
- -}
-queueDb
- :: DbHandle
- -> (Size -> LastCommitTime -> IO Bool)
- -> SqlPersistM ()
- -> IO ()
-queueDb h@(DbHandle _ _ qvar) commitchecker a = do
- DbQueue sz lastcommittime qa <- takeMVar qvar
- let !sz' = sz + 1
- let qa' = qa >> a
- let enqueue = putMVar qvar
- ifM (commitchecker sz' lastcommittime)
- ( do
- r <- commitDb h qa'
- case r of
- Left _ -> enqueue $ DbQueue sz' lastcommittime qa'
- Right _ -> do
- now <- getCurrentTime
- enqueue $ DbQueue 0 now (return ())
- , enqueue $ DbQueue sz' lastcommittime qa'
- )
-
-{- If flushing the queue fails, this could be because there is another
- - writer to the database. Retry repeatedly for up to 10 seconds. -}
-flushQueueDb :: DbHandle -> IO ()
-flushQueueDb h@(DbHandle _ _ qvar) = do
- DbQueue sz _ qa <- takeMVar qvar
- when (sz > 0) $
- robustly Nothing 100 (commitDb h qa)
- where
- robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
- robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
- robustly _ n a = do
- r <- a
- case r of
- Right _ -> return ()
- Left e -> do
- threadDelay 100000 -- 1/10th second
- robustly (Just e) (n-1) a
-
-commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
-commitDb (DbHandle _ jobs _) a = do
- res <- newEmptyMVar
- putMVar jobs $ ChangeJob $ \runner ->
- liftIO $ putMVar res =<< tryNonAsync (runner a)
- takeMVar res
diff --git a/Database/Keys.hs b/Database/Keys.hs
index a0c5b1a04..425f1d54b 100644
--- a/Database/Keys.hs
+++ b/Database/Keys.hs
@@ -28,7 +28,7 @@ module Database.Keys (
import Database.Types
import Database.Keys.Types
-import qualified Database.Handle as H
+import qualified Database.Queue as H
import Locations
import Common hiding (delete)
import Annex
@@ -72,7 +72,7 @@ openDb = withExclusiveLock gitAnnexKeysDbLock $ do
runMigrationSilent migrateKeysDb
setAnnexDirPerm dbdir
setAnnexFilePerm db
- h <- liftIO $ H.openDb db "content"
+ h <- liftIO $ H.openDbQueue db "content"
-- work around https://github.com/yesodweb/persistent/issues/474
liftIO setConsoleEncoding
@@ -80,9 +80,9 @@ openDb = withExclusiveLock gitAnnexKeysDbLock $ do
return $ DbHandle h
closeDb :: DbHandle -> IO ()
-closeDb (DbHandle h) = H.closeDb h
+closeDb (DbHandle h) = H.closeDbQueue h
-withDbHandle :: (H.DbHandle -> IO a) -> Annex a
+withDbHandle :: (H.DbQueue -> IO a) -> Annex a
withDbHandle a = bracket openDb (liftIO . closeDb) (\(DbHandle h) -> liftIO (a h))
addAssociatedFile :: Key -> FilePath -> Annex ()
@@ -98,7 +98,7 @@ addAssociatedFile k f = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ d
{- Note that the files returned were once associated with the key, but
- some of them may not be any longer. -}
getAssociatedFiles :: Key -> Annex [FilePath]
-getAssociatedFiles k = withDbHandle $ \h -> H.queryDb h $
+getAssociatedFiles k = withDbHandle $ \h -> H.queryDbQueue h $
getAssociatedFiles' $ toSKey k
getAssociatedFiles' :: SKey -> SqlPersistM [FilePath]
@@ -111,7 +111,7 @@ getAssociatedFiles' sk = do
{- Gets any keys that are on record as having a particular associated file.
- (Should be one or none but the database doesn't enforce that.) -}
getAssociatedKey :: FilePath -> Annex [Key]
-getAssociatedKey f = withDbHandle $ \h -> H.queryDb h $
+getAssociatedKey f = withDbHandle $ \h -> H.queryDbQueue h $
getAssociatedKey' f
getAssociatedKey' :: FilePath -> SqlPersistM [Key]
@@ -140,7 +140,7 @@ addInodeCaches k is = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $
{- A key may have multiple InodeCaches; one for the annex object, and one
- for each pointer file that is a copy of it. -}
getInodeCaches :: Key -> Annex [InodeCache]
-getInodeCaches k = withDbHandle $ \h -> H.queryDb h $ do
+getInodeCaches k = withDbHandle $ \h -> H.queryDbQueue h $ do
l <- select $ from $ \r -> do
where_ (r ^. ContentKey ==. val sk)
return (r ^. ContentCache)
diff --git a/Database/Keys/Types.hs b/Database/Keys/Types.hs
index a627b3ca5..3fabafcf2 100644
--- a/Database/Keys/Types.hs
+++ b/Database/Keys/Types.hs
@@ -9,6 +9,6 @@ module Database.Keys.Types (
DbHandle(..)
) where
-import qualified Database.Handle as H
+import qualified Database.Queue as H
-newtype DbHandle = DbHandle H.DbHandle
+newtype DbHandle = DbHandle H.DbQueue
diff --git a/Database/Queue.hs b/Database/Queue.hs
new file mode 100644
index 000000000..149854757
--- /dev/null
+++ b/Database/Queue.hs
@@ -0,0 +1,104 @@
+{- Persistent sqlite database queues
+ -
+ - Copyright 2015 Joey Hess <id@joeyh.name>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE BangPatterns #-}
+
+module Database.Queue (
+ DbQueue,
+ initDb,
+ openDbQueue,
+ queryDbQueue,
+ closeDbQueue,
+ QueueSize,
+ queueDb,
+) where
+
+import Utility.Monad
+import Database.Handle
+
+import Database.Persist.Sqlite
+import Control.Monad
+import Control.Concurrent
+import Data.Time.Clock
+
+{- A DbQueue wraps a DbHandle, adding a queue of writes to perform.
+ -
+ - This is efficient when there are frequent writes, but
+ - reads will not immediately have access to queued writes. -}
+data DbQueue = DQ DbHandle (MVar Queue)
+
+{- Opens the database queue, but does not perform any migrations. Only use
+ - if the database is known to exist and have the right tables; ie after
+ - running initDb. -}
+openDbQueue :: FilePath -> TableName -> IO DbQueue
+openDbQueue db tablename = DQ
+ <$> openDb db tablename
+ <*> (newMVar =<< emptyQueue)
+
+{- Must be called to ensure queued changes get written to the database. -}
+closeDbQueue :: DbQueue -> IO ()
+closeDbQueue h@(DQ hdl _) = do
+ flushDbQueue h
+ closeDb hdl
+
+{- Makes a queury using the DbQueue. This should not be used to make
+ - changes to the database!
+ -
+ - Queries will not return changes that have been recently queued,
+ - so use with care.
+ -}
+queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
+queryDbQueue (DQ hdl _) = queryDb hdl
+
+{- A queue of actions to perform, with a count of the number of actions
+ - queued, and a last commit time. -}
+data Queue = Queue QueueSize LastCommitTime (SqlPersistM ())
+
+type QueueSize = Int
+
+type LastCommitTime = UTCTime
+
+emptyQueue :: IO Queue
+emptyQueue = do
+ now <- getCurrentTime
+ return $ Queue 0 now (return ())
+
+flushDbQueue :: DbQueue -> IO ()
+flushDbQueue (DQ hdl qvar) = do
+ Queue sz _ qa <- takeMVar qvar
+ when (sz > 0) $
+ commitDb hdl qa
+
+{- Queues a change to be made to the database. It will be queued
+ - to be committed later, unless the commitchecker action returns true,
+ - in which case any previously queued changes are also committed.
+ -
+ - Transactions built up by queueDb are sent to sqlite all at once.
+ - If sqlite fails due to another change being made concurrently by another
+ - process, the transaction is put back in the queue. This avoids
+ - the sqlite multiple writer problem.
+ -}
+queueDb
+ :: DbQueue
+ -> (QueueSize -> LastCommitTime -> IO Bool)
+ -> SqlPersistM ()
+ -> IO ()
+queueDb (DQ hdl qvar) commitchecker a = do
+ Queue sz lastcommittime qa <- takeMVar qvar
+ let !sz' = sz + 1
+ let qa' = qa >> a
+ let enqueue = putMVar qvar
+ ifM (commitchecker sz' lastcommittime)
+ ( do
+ r <- commitDb' hdl qa'
+ case r of
+ Left _ -> enqueue $ Queue sz' lastcommittime qa'
+ Right _ -> do
+ now <- getCurrentTime
+ enqueue $ Queue 0 now (return ())
+ , enqueue $ Queue sz' lastcommittime qa'
+ )