diff options
author | Joey Hess <joeyh@joeyh.name> | 2015-12-23 14:59:58 -0400 |
---|---|---|
committer | Joey Hess <joeyh@joeyh.name> | 2015-12-23 14:59:58 -0400 |
commit | 1aee8106ac52b8337a12eb7b29bdf2ce2f3b4ca0 (patch) | |
tree | e913b730bab350207734a36d9dc2fd05d2393427 /Database | |
parent | 8693130a22747fe90c6f950ee43d56ebc8a7c236 (diff) |
split out Database.Queue from Database.Handle
Fsck can use the queue for efficiency since it is write-heavy, and only
reads a value before writing it. But, the queue is not suited to the Keys
database.
Diffstat (limited to 'Database')
-rw-r--r-- | Database/Fsck.hs | 11 | ||||
-rw-r--r-- | Database/Handle.hs | 164 | ||||
-rw-r--r-- | Database/Keys.hs | 14 | ||||
-rw-r--r-- | Database/Keys/Types.hs | 4 | ||||
-rw-r--r-- | Database/Queue.hs | 104 |
5 files changed, 177 insertions, 120 deletions
diff --git a/Database/Fsck.hs b/Database/Fsck.hs index b0e56f6c0..d176690a6 100644 --- a/Database/Fsck.hs +++ b/Database/Fsck.hs @@ -21,7 +21,7 @@ module Database.Fsck ( ) where import Database.Types -import qualified Database.Handle as H +import qualified Database.Queue as H import Locations import Utility.PosixFiles import Utility.Exception @@ -37,7 +37,7 @@ import Database.Persist.TH import Database.Esqueleto hiding (Key) import Data.Time.Clock -data FsckHandle = FsckHandle H.DbHandle UUID +data FsckHandle = FsckHandle H.DbQueue UUID {- Each key stored in the database has already been fscked as part - of the latest incremental fsck pass. -} @@ -77,7 +77,7 @@ openDb u = do void $ tryIO $ removeDirectoryRecursive dbdir rename tmpdbdir dbdir lockFileCached =<< fromRepo (gitAnnexFsckDbLock u) - h <- liftIO $ H.openDb db "fscked" + h <- liftIO $ H.openDbQueue db "fscked" -- work around https://github.com/yesodweb/persistent/issues/474 liftIO setConsoleEncoding @@ -86,7 +86,7 @@ openDb u = do closeDb :: FsckHandle -> Annex () closeDb (FsckHandle h u) = do - liftIO $ H.closeDb h + liftIO $ H.closeDbQueue h unlockFile =<< fromRepo (gitAnnexFsckDbLock u) addDb :: FsckHandle -> Key -> IO () @@ -102,8 +102,9 @@ addDb (FsckHandle h _) k = H.queueDb h checkcommit $ now <- getCurrentTime return $ diffUTCTime lastcommittime now > 300 +{- Doesn't know about keys that were just added with addDb. -} inDb :: FsckHandle -> Key -> IO Bool -inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey +inDb (FsckHandle h _) = H.queryDbQueue h . inDb' . toSKey inDb' :: SKey -> SqlPersistM Bool inDb' sk = do diff --git a/Database/Handle.hs b/Database/Handle.hs index 67f759265..a45fad22e 100644 --- a/Database/Handle.hs +++ b/Database/Handle.hs @@ -11,16 +11,14 @@ module Database.Handle ( DbHandle, initDb, openDb, + TableName, queryDb, closeDb, - Size, - queueDb, - flushQueueDb, commitDb, + commitDb', ) where import Utility.Exception -import Utility.Monad import Database.Persist.Sqlite import qualified Database.Sqlite as Sqlite @@ -33,18 +31,17 @@ import qualified Data.Text as T import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Logger (runNoLoggingT) import Data.List -import Data.Time.Clock import System.IO {- A DbHandle is a reference to a worker thread that communicates with - the database. It has a MVar which Jobs are submitted to. -} -data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue) +data DbHandle = DbHandle (Async ()) (MVar Job) {- Ensures that the database is initialized. Pass the migration action for - the database. - - - The database is put into WAL mode, to prevent readers from blocking - - writers, and prevent a writer from blocking readers. + - The database is initialized using WAL mode, to prevent readers + - from blocking writers, and prevent a writer from blocking readers. -} initDb :: FilePath -> SqlPersistM () -> IO () initDb f migration = do @@ -60,22 +57,71 @@ enableWAL db = do void $ Sqlite.finalize stmt Sqlite.close conn +{- Name of a table that should exist once the database is initialized. -} +type TableName = String + {- Opens the database, but does not perform any migrations. Only use - if the database is known to exist and have the right tables. -} openDb :: FilePath -> TableName -> IO DbHandle openDb db tablename = do jobs <- newEmptyMVar worker <- async (workerThread (T.pack db) tablename jobs) - q <- newMVar =<< emptyDbQueue - return $ DbHandle worker jobs q + return $ DbHandle worker jobs + +closeDb :: DbHandle -> IO () +closeDb (DbHandle worker jobs) = do + putMVar jobs CloseJob + wait worker + +{- Makes a query using the DbHandle. This should not be used to make + - changes to the database! + - + - Note that the action is not run by the calling thread, but by a + - worker thread. Exceptions are propigated to the calling thread. + - + - Only one action can be run at a time against a given DbHandle. + - If called concurrently in the same process, this will block until + - it is able to run. + -} +queryDb :: DbHandle -> SqlPersistM a -> IO a +queryDb (DbHandle _ jobs) a = do + res <- newEmptyMVar + putMVar jobs $ QueryJob $ + liftIO . putMVar res =<< tryNonAsync a + (either throwIO return =<< takeMVar res) + `catchNonAsync` (const $ error "sqlite query crashed") + +{- Writes a change to the database. + - + - If a database is opened multiple times and there's a concurrent writer, + - the write could fail. Retries repeatedly for up to 10 seconds, + - which should avoid all but the most exceptional problems. + -} +commitDb :: DbHandle -> SqlPersistM () -> IO () +commitDb h wa = robustly Nothing 100 (commitDb' h wa) + where + robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO () + robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e + robustly _ n a = do + r <- a + case r of + Right _ -> return () + Left e -> do + threadDelay 100000 -- 1/10th second + robustly (Just e) (n-1) a + +commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ()) +commitDb' (DbHandle _ jobs) a = do + res <- newEmptyMVar + putMVar jobs $ ChangeJob $ \runner -> + liftIO $ putMVar res =<< tryNonAsync (runner a) + takeMVar res data Job = QueryJob (SqlPersistM ()) | ChangeJob ((SqlPersistM () -> IO ()) -> IO ()) | CloseJob -type TableName = String - workerThread :: T.Text -> TableName -> MVar Job -> IO () workerThread db tablename jobs = catchNonAsync (run loop) showerr where @@ -121,97 +167,3 @@ workerThread db tablename jobs = catchNonAsync (run loop) showerr -- This should succeed for any table. nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1" - -{- Makes a query using the DbHandle. This should not be used to make - - changes to the database! - - - - Note that the action is not run by the calling thread, but by a - - worker thread. Exceptions are propigated to the calling thread. - - - - Only one action can be run at a time against a given DbHandle. - - If called concurrently in the same process, this will block until - - it is able to run. - -} -queryDb :: DbHandle -> SqlPersistM a -> IO a -queryDb (DbHandle _ jobs _) a = do - res <- newEmptyMVar - putMVar jobs $ QueryJob $ - liftIO . putMVar res =<< tryNonAsync a - (either throwIO return =<< takeMVar res) - `catchNonAsync` (const $ error "sqlite query crashed") - -closeDb :: DbHandle -> IO () -closeDb h@(DbHandle worker jobs _) = do - putMVar jobs CloseJob - wait worker - flushQueueDb h - -type Size = Int - -type LastCommitTime = UTCTime - -{- A queue of actions to perform, with a count of the number of actions - - queued, and a last commit time. -} -data DbQueue = DbQueue Size LastCommitTime (SqlPersistM ()) - -emptyDbQueue :: IO DbQueue -emptyDbQueue = do - now <- getCurrentTime - return $ DbQueue 0 now (return ()) - -{- Queues a change to be made to the database. It will be buffered - - to be committed later, unless the commitchecker action returns true. - - - - (Be sure to call closeDb or flushQueueDb to ensure the change - - gets committed.) - - - - Transactions built up by queueDb are sent to sqlite all at once. - - If sqlite fails due to another change being made concurrently by another - - process, the transaction is put back in the queue. This solves - - the sqlite multiple writer problem. - -} -queueDb - :: DbHandle - -> (Size -> LastCommitTime -> IO Bool) - -> SqlPersistM () - -> IO () -queueDb h@(DbHandle _ _ qvar) commitchecker a = do - DbQueue sz lastcommittime qa <- takeMVar qvar - let !sz' = sz + 1 - let qa' = qa >> a - let enqueue = putMVar qvar - ifM (commitchecker sz' lastcommittime) - ( do - r <- commitDb h qa' - case r of - Left _ -> enqueue $ DbQueue sz' lastcommittime qa' - Right _ -> do - now <- getCurrentTime - enqueue $ DbQueue 0 now (return ()) - , enqueue $ DbQueue sz' lastcommittime qa' - ) - -{- If flushing the queue fails, this could be because there is another - - writer to the database. Retry repeatedly for up to 10 seconds. -} -flushQueueDb :: DbHandle -> IO () -flushQueueDb h@(DbHandle _ _ qvar) = do - DbQueue sz _ qa <- takeMVar qvar - when (sz > 0) $ - robustly Nothing 100 (commitDb h qa) - where - robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO () - robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e - robustly _ n a = do - r <- a - case r of - Right _ -> return () - Left e -> do - threadDelay 100000 -- 1/10th second - robustly (Just e) (n-1) a - -commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ()) -commitDb (DbHandle _ jobs _) a = do - res <- newEmptyMVar - putMVar jobs $ ChangeJob $ \runner -> - liftIO $ putMVar res =<< tryNonAsync (runner a) - takeMVar res diff --git a/Database/Keys.hs b/Database/Keys.hs index a0c5b1a04..425f1d54b 100644 --- a/Database/Keys.hs +++ b/Database/Keys.hs @@ -28,7 +28,7 @@ module Database.Keys ( import Database.Types import Database.Keys.Types -import qualified Database.Handle as H +import qualified Database.Queue as H import Locations import Common hiding (delete) import Annex @@ -72,7 +72,7 @@ openDb = withExclusiveLock gitAnnexKeysDbLock $ do runMigrationSilent migrateKeysDb setAnnexDirPerm dbdir setAnnexFilePerm db - h <- liftIO $ H.openDb db "content" + h <- liftIO $ H.openDbQueue db "content" -- work around https://github.com/yesodweb/persistent/issues/474 liftIO setConsoleEncoding @@ -80,9 +80,9 @@ openDb = withExclusiveLock gitAnnexKeysDbLock $ do return $ DbHandle h closeDb :: DbHandle -> IO () -closeDb (DbHandle h) = H.closeDb h +closeDb (DbHandle h) = H.closeDbQueue h -withDbHandle :: (H.DbHandle -> IO a) -> Annex a +withDbHandle :: (H.DbQueue -> IO a) -> Annex a withDbHandle a = bracket openDb (liftIO . closeDb) (\(DbHandle h) -> liftIO (a h)) addAssociatedFile :: Key -> FilePath -> Annex () @@ -98,7 +98,7 @@ addAssociatedFile k f = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ d {- Note that the files returned were once associated with the key, but - some of them may not be any longer. -} getAssociatedFiles :: Key -> Annex [FilePath] -getAssociatedFiles k = withDbHandle $ \h -> H.queryDb h $ +getAssociatedFiles k = withDbHandle $ \h -> H.queryDbQueue h $ getAssociatedFiles' $ toSKey k getAssociatedFiles' :: SKey -> SqlPersistM [FilePath] @@ -111,7 +111,7 @@ getAssociatedFiles' sk = do {- Gets any keys that are on record as having a particular associated file. - (Should be one or none but the database doesn't enforce that.) -} getAssociatedKey :: FilePath -> Annex [Key] -getAssociatedKey f = withDbHandle $ \h -> H.queryDb h $ +getAssociatedKey f = withDbHandle $ \h -> H.queryDbQueue h $ getAssociatedKey' f getAssociatedKey' :: FilePath -> SqlPersistM [Key] @@ -140,7 +140,7 @@ addInodeCaches k is = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ {- A key may have multiple InodeCaches; one for the annex object, and one - for each pointer file that is a copy of it. -} getInodeCaches :: Key -> Annex [InodeCache] -getInodeCaches k = withDbHandle $ \h -> H.queryDb h $ do +getInodeCaches k = withDbHandle $ \h -> H.queryDbQueue h $ do l <- select $ from $ \r -> do where_ (r ^. ContentKey ==. val sk) return (r ^. ContentCache) diff --git a/Database/Keys/Types.hs b/Database/Keys/Types.hs index a627b3ca5..3fabafcf2 100644 --- a/Database/Keys/Types.hs +++ b/Database/Keys/Types.hs @@ -9,6 +9,6 @@ module Database.Keys.Types ( DbHandle(..) ) where -import qualified Database.Handle as H +import qualified Database.Queue as H -newtype DbHandle = DbHandle H.DbHandle +newtype DbHandle = DbHandle H.DbQueue diff --git a/Database/Queue.hs b/Database/Queue.hs new file mode 100644 index 000000000..149854757 --- /dev/null +++ b/Database/Queue.hs @@ -0,0 +1,104 @@ +{- Persistent sqlite database queues + - + - Copyright 2015 Joey Hess <id@joeyh.name> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE BangPatterns #-} + +module Database.Queue ( + DbQueue, + initDb, + openDbQueue, + queryDbQueue, + closeDbQueue, + QueueSize, + queueDb, +) where + +import Utility.Monad +import Database.Handle + +import Database.Persist.Sqlite +import Control.Monad +import Control.Concurrent +import Data.Time.Clock + +{- A DbQueue wraps a DbHandle, adding a queue of writes to perform. + - + - This is efficient when there are frequent writes, but + - reads will not immediately have access to queued writes. -} +data DbQueue = DQ DbHandle (MVar Queue) + +{- Opens the database queue, but does not perform any migrations. Only use + - if the database is known to exist and have the right tables; ie after + - running initDb. -} +openDbQueue :: FilePath -> TableName -> IO DbQueue +openDbQueue db tablename = DQ + <$> openDb db tablename + <*> (newMVar =<< emptyQueue) + +{- Must be called to ensure queued changes get written to the database. -} +closeDbQueue :: DbQueue -> IO () +closeDbQueue h@(DQ hdl _) = do + flushDbQueue h + closeDb hdl + +{- Makes a queury using the DbQueue. This should not be used to make + - changes to the database! + - + - Queries will not return changes that have been recently queued, + - so use with care. + -} +queryDbQueue :: DbQueue -> SqlPersistM a -> IO a +queryDbQueue (DQ hdl _) = queryDb hdl + +{- A queue of actions to perform, with a count of the number of actions + - queued, and a last commit time. -} +data Queue = Queue QueueSize LastCommitTime (SqlPersistM ()) + +type QueueSize = Int + +type LastCommitTime = UTCTime + +emptyQueue :: IO Queue +emptyQueue = do + now <- getCurrentTime + return $ Queue 0 now (return ()) + +flushDbQueue :: DbQueue -> IO () +flushDbQueue (DQ hdl qvar) = do + Queue sz _ qa <- takeMVar qvar + when (sz > 0) $ + commitDb hdl qa + +{- Queues a change to be made to the database. It will be queued + - to be committed later, unless the commitchecker action returns true, + - in which case any previously queued changes are also committed. + - + - Transactions built up by queueDb are sent to sqlite all at once. + - If sqlite fails due to another change being made concurrently by another + - process, the transaction is put back in the queue. This avoids + - the sqlite multiple writer problem. + -} +queueDb + :: DbQueue + -> (QueueSize -> LastCommitTime -> IO Bool) + -> SqlPersistM () + -> IO () +queueDb (DQ hdl qvar) commitchecker a = do + Queue sz lastcommittime qa <- takeMVar qvar + let !sz' = sz + 1 + let qa' = qa >> a + let enqueue = putMVar qvar + ifM (commitchecker sz' lastcommittime) + ( do + r <- commitDb' hdl qa' + case r of + Left _ -> enqueue $ Queue sz' lastcommittime qa' + Right _ -> do + now <- getCurrentTime + enqueue $ Queue 0 now (return ()) + , enqueue $ Queue sz' lastcommittime qa' + ) |