diff options
-rw-r--r-- | Command/Fsck.hs | 4 | ||||
-rw-r--r-- | Database/Fsck.hs | 10 | ||||
-rw-r--r-- | Database/Handle.hs | 34 |
3 files changed, 36 insertions, 12 deletions
diff --git a/Command/Fsck.hs b/Command/Fsck.hs index 59d07caf4..9dba21dbf 100644 --- a/Command/Fsck.hs +++ b/Command/Fsck.hs @@ -443,9 +443,7 @@ withFsckDb (StartIncremental h) a = a h withFsckDb NonIncremental _ = noop recordFsckTime :: Incremental -> Key -> Annex () -recordFsckTime inc key = withFsckDb inc $ \h -> liftIO $ do - FsckDb.addDb h key - FsckDb.commitDb h +recordFsckTime inc key = withFsckDb inc $ \h -> liftIO $ FsckDb.addDb h key {- Records the start time of an incremental fsck. - diff --git a/Database/Fsck.hs b/Database/Fsck.hs index 2b622e844..e52603d9c 100644 --- a/Database/Fsck.hs +++ b/Database/Fsck.hs @@ -40,7 +40,6 @@ share [mkPersist sqlSettings, mkMigrate "migrateFsck"] [persistLowerCase| Fscked key SKey UniqueKey key - deriving Show |] {- The database is removed when starting a new incremental fsck pass. -} @@ -62,7 +61,7 @@ openDb = do liftIO $ H.openDb db addDb :: H.DbHandle -> Key -> IO () -addDb h = void . H.runDb h . insert . Fscked . toSKey +addDb h = void . H.runDb' h commitPolicy . insert . Fscked . toSKey inDb :: H.DbHandle -> Key -> IO Bool inDb h k = H.runDb h $ do @@ -70,3 +69,10 @@ inDb h k = H.runDb h $ do where_ (r ^. FsckedKey ==. val (toSKey k)) return (r ^. FsckedKey) return $ not $ null r + +{- Bundle up addDb transactions and commit after 60 seconds. + - This is a balance between resuming where the last incremental + - fsck left off, and making too many commits which slows down the fsck + - of lots of small or not present files. -} +commitPolicy :: H.CommitPolicy +commitPolicy = H.CommitAfterSeconds 60 diff --git a/Database/Handle.hs b/Database/Handle.hs index b42c32812..59cab0e44 100644 --- a/Database/Handle.hs +++ b/Database/Handle.hs @@ -9,6 +9,8 @@ module Database.Handle ( DbHandle, openDb, runDb, + CommitPolicy(..), + runDb', commitDb, closeDb, ) where @@ -17,15 +19,17 @@ import Utility.Exception import Database.Persist.Sqlite (runSqlite) import Database.Esqueleto hiding (Key) +import Control.Monad import Control.Monad.IO.Class (liftIO) import Control.Concurrent import Control.Concurrent.Async import Control.Exception (throwIO) import qualified Data.Text as T +import Data.Time.Clock {- A DbHandle is a reference to a worker thread that communicates with - the database. It has a MVar which Jobs are submitted to. -} -data DbHandle = DbHandle (Async ()) (MVar Job) +data DbHandle = DbHandle (Async ()) (MVar Job) (MVar UTCTime) data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob @@ -33,7 +37,8 @@ openDb :: FilePath -> IO DbHandle openDb db = do jobs <- newEmptyMVar worker <- async (workerThread (T.pack db) jobs) - return $ DbHandle worker jobs + t <- newMVar =<< getCurrentTime + return $ DbHandle worker jobs t workerThread :: T.Text -> MVar Job -> IO () workerThread db jobs = go @@ -50,26 +55,41 @@ workerThread db jobs = go CommitJob -> return CommitJob CloseJob -> return CloseJob + {- Runs an action using the DbHandle. - - Note that the action is not run by the calling thread, but by a - worker thread. Exceptions are propigated to the calling thread. - - - Note that only one action can be run at a time against a given DbHandle. + - Only one action can be run at a time against a given DbHandle. - If called concurrently, this will block until it is able to run. -} runDb :: DbHandle -> SqlPersistM a -> IO a -runDb (DbHandle _ jobs) a = do +runDb h = runDb' h CommitManually + +data CommitPolicy = CommitManually | CommitAfterSeconds Int + +runDb' :: DbHandle -> CommitPolicy -> SqlPersistM a -> IO a +runDb' h@(DbHandle _ jobs t) pol a = do res <- newEmptyMVar putMVar jobs $ RunJob $ liftIO . putMVar res =<< tryNonAsync a - either throwIO return =<< takeMVar res + r <- either throwIO return =<< takeMVar res + case pol of + CommitManually -> return () + CommitAfterSeconds n -> do + now <- getCurrentTime + prev <- takeMVar t + putMVar t now + when (diffUTCTime now prev > fromIntegral n) $ + commitDb h + return r {- Commits any transaction that was created by the previous calls to runDb, - and starts a new transaction. -} commitDb :: DbHandle -> IO () -commitDb (DbHandle _ jobs) = putMVar jobs CommitJob +commitDb (DbHandle _ jobs _) = putMVar jobs CommitJob closeDb :: DbHandle -> IO () -closeDb (DbHandle worker jobs) = do +closeDb (DbHandle worker jobs _) = do putMVar jobs CloseJob wait worker |