diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/Changes.hs | 38 | ||||
-rw-r--r-- | Assistant/Committer.hs | 162 | ||||
-rw-r--r-- | Assistant/DaemonStatus.hs | 10 | ||||
-rw-r--r-- | Assistant/Watcher.hs | 32 |
4 files changed, 135 insertions, 107 deletions
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs index 1cad42326..173ba1922 100644 --- a/Assistant/Changes.hs +++ b/Assistant/Changes.hs @@ -7,20 +7,26 @@ module Assistant.Changes where import Common.Annex import qualified Annex.Queue +import Types.KeySource import Control.Concurrent.STM import Data.Time.Clock -data ChangeType = PendingAddChange | LinkChange | RmChange | RmDirChange +data ChangeType = AddChange | LinkChange | RmChange | RmDirChange deriving (Show, Eq) type ChangeChan = TChan Change -data Change = Change - { changeTime :: UTCTime - , changeFile :: FilePath - , changeType :: ChangeType - } +data Change + = Change + { changeTime :: UTCTime + , changeFile :: FilePath + , changeType :: ChangeType + } + | PendingAddChange + { changeTime ::UTCTime + , keySource :: KeySource + } deriving (Show) runChangeChan :: STM a -> IO a @@ -33,13 +39,29 @@ newChangeChan = atomically newTChan madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) madeChange f t = do -- Just in case the commit thread is not flushing the queue fast enough. - when (t /= PendingAddChange) $ - Annex.Queue.flushWhenFull + Annex.Queue.flushWhenFull liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t) noChange :: Annex (Maybe Change) noChange = return Nothing +{- Indicates an add is in progress. -} +pendingAddChange :: KeySource -> Annex (Maybe Change) +pendingAddChange ks = + liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure ks) + +isPendingAddChange :: Change -> Bool +isPendingAddChange (PendingAddChange {}) = True +isPendingAddChange _ = False + +finishedChange :: Change -> Change +finishedChange c@(PendingAddChange { keySource = ks }) = Change + { changeTime = changeTime c + , changeFile = keyFilename ks + , changeType = AddChange + } +finishedChange c = c + {- Gets all unhandled changes. - Blocks until at least one change is made. -} getChanges :: ChangeChan -> IO [Change] diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs index 600034a0a..46fee1b74 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Committer.hs @@ -7,7 +7,6 @@ module Assistant.Committer where import Common.Annex import Assistant.Changes -import Assistant.DaemonStatus import Assistant.ThreadedMonad import Assistant.Watcher import qualified Annex @@ -24,20 +23,25 @@ import Types.KeySource import Data.Time.Clock import Data.Tuple.Utils import qualified Data.Set as S +import Data.Either {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () -commitThread st dstatus changechan = runEvery (Seconds 1) $ do +commitThread :: ThreadState -> ChangeChan -> IO () +commitThread st changechan = runEvery (Seconds 1) $ do -- We already waited one second as a simple rate limiter. - -- Next, wait until at least one change has been made. - cs <- getChanges changechan + -- Next, wait until at least one change is available for + -- processing. + changes <- getChanges changechan -- Now see if now's a good time to commit. time <- getCurrentTime - if shouldCommit time cs + if shouldCommit time changes then do - handleAdds st dstatus changechan cs - void $ tryIO $ runThreadState st commitStaged - else refillChanges changechan cs + readychanges <- handleAdds st changechan changes + if shouldCommit time readychanges + then do + void $ tryIO $ runThreadState st commitStaged + else refillChanges changechan readychanges + else refillChanges changechan changes commitStaged :: Annex () commitStaged = do @@ -83,95 +87,99 @@ shouldCommit now changes - staged before returning, and will be committed immediately. - - OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly - - created and staged, if the file is not open. + - created and staged. + - + - Returns a list of all changes that are ready to be committed. + - Any pending adds that are not ready yet are put back into the ChangeChan, + - where they will be retried later. -} -handleAdds :: ThreadState -> DaemonStatusHandle -> ChangeChan -> [Change] -> IO () -handleAdds st dstatus changechan cs - | null toadd = noop - | otherwise = do - toadd' <- safeToAdd st dstatus toadd - unless (null toadd') $ do - added <- filter id <$> forM toadd' add - unless (DirWatcher.eventsCoalesce || null added) $ - handleAdds st dstatus changechan +handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change] +handleAdds st changechan cs = returnWhen (null pendingadds) $ do + (postponed, toadd) <- partitionEithers <$> + safeToAdd st pendingadds + + unless (null postponed) $ + refillChanges changechan postponed + + returnWhen (null toadd) $ do + added <- catMaybes <$> forM toadd add + if (DirWatcher.eventsCoalesce || null added) + then return $ added ++ otherchanges + else do + r <- handleAdds st changechan =<< getChanges changechan + return $ r ++ added ++ otherchanges where - toadd = map changeFile $ filter isPendingAdd cs + (pendingadds, otherchanges) = partition isPendingAddChange cs + + returnWhen c a + | c = return otherchanges + | otherwise = a - isPendingAdd (Change { changeType = PendingAddChange }) = True - isPendingAdd _ = False + add :: Change -> IO (Maybe Change) + add change@(PendingAddChange { keySource = ks }) = do + r <- catchMaybeIO $ runThreadState st $ do + showStart "add" $ keyFilename ks + handle (finishedChange change) (keyFilename ks) + =<< Command.Add.ingest ks + return $ maybeMaybe r + add _ = return Nothing - add keysource = catchBoolIO $ runThreadState st $ do - showStart "add" $ keyFilename keysource - handle (keyFilename keysource) - =<< Command.Add.ingest keysource + maybeMaybe (Just j@(Just _)) = j + maybeMaybe _ = Nothing - handle _ Nothing = do + handle _ _ Nothing = do showEndFail - return False - handle file (Just key) = do + return Nothing + handle change file (Just key) = do link <- Command.Add.link file key True when DirWatcher.eventsCoalesce $ do sha <- inRepo $ Git.HashObject.hashObject BlobObject link stageSymlink file sha showEndOk - return True + return $ Just change -{- Checks which of a set of files can safely be added. - - Files are locked down as hard links in a temp directory, - - with their write bits disabled. But some may still be - - opened for write, so lsof is run on the temp directory - - to check them. +{- PendingAddChanges can Either be Right to be added now, + - or are unsafe, and must be Left for later. + - + - Check by running lsof on the temp directory, which + - the KeySources are locked down in. -} -safeToAdd :: ThreadState -> DaemonStatusHandle -> [FilePath] -> IO [KeySource] -safeToAdd st dstatus files = do - locked <- catMaybes <$> lockdown files - runThreadState st $ ifM (Annex.getState Annex.force) - ( return locked -- force bypasses lsof check +safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change] +safeToAdd st changes = runThreadState st $ + ifM (Annex.getState Annex.force) + ( allRight changes -- force bypasses lsof check , do tmpdir <- fromRepo gitAnnexTmpDir - open <- S.fromList . map fst3 . filter openwrite <$> + openfiles <- S.fromList . map fst3 . filter openwrite <$> liftIO (Lsof.queryDir tmpdir) - catMaybes <$> forM locked (go open) + + let checked = map (check openfiles) changes + + {- If new events are received when files are closed, + - there's no need to retry any changes that cannot + - be done now. -} + if DirWatcher.closingTracked + then do + mapM_ canceladd $ lefts checked + allRight $ rights checked + else return checked ) where - {- When a file is still open, it can be put into pendingAdd - - to be checked again later. However when closingTracked - - is supported, another event will be received once it's - - closed, so there's no point in doing so. -} - go open keysource - | S.member (contentLocation keysource) open = do - if DirWatcher.closingTracked - then do - warning $ keyFilename keysource - ++ " still has writers, not adding" - void $ liftIO $ canceladd keysource - else void $ addpending keysource - return Nothing - | otherwise = return $ Just keysource - - canceladd keysource = tryIO $ + check openfiles change@(PendingAddChange { keySource = ks }) + | S.member (contentLocation ks) openfiles = Left change + check _ change = Right change + + canceladd (PendingAddChange { keySource = ks }) = do + warning $ keyFilename ks + ++ " still has writers, not adding" -- remove the hard link - removeFile $ contentLocation keysource - - {- The same file (or a file with the same name) - - could already be pending add; if so this KeySource - - superscedes the old one. -} - addpending keysource = modifyDaemonStatusM dstatus $ \s -> do - let set = pendingAdd s - mapM_ canceladd $ S.toList $ S.filter (== keysource) set - return $ s { pendingAdd = S.insert keysource set } - - lockdown = mapM $ \file -> do - ms <- catchMaybeIO $ getSymbolicLinkStatus file - case ms of - Just s - | isRegularFile s -> - catchMaybeIO $ runThreadState st $ - Command.Add.lockDown file - _ -> return Nothing - + void $ liftIO $ tryIO $ + removeFile $ contentLocation ks + canceladd _ = noop openwrite (_file, mode, _pid) = mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite + + allRight = return . map Right diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index 289a97bb2..e5ba3d151 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -9,14 +9,12 @@ import Common.Annex import Assistant.ThreadedMonad import Utility.ThreadScheduler import Utility.TempFile -import Types.KeySource import Control.Concurrent import System.Posix.Types import Data.Time.Clock.POSIX import Data.Time import System.Locale -import qualified Data.Set as S data DaemonStatus = DaemonStatus -- False when the daemon is performing its startup scan @@ -27,8 +25,6 @@ data DaemonStatus = DaemonStatus , sanityCheckRunning :: Bool -- Last time the sanity checker ran , lastSanityCheck :: Maybe POSIXTime - -- Files that are in the process of being added to the annex. - , pendingAdd :: S.Set KeySource } deriving (Show) @@ -40,17 +36,13 @@ newDaemonStatus = DaemonStatus , lastRunning = Nothing , sanityCheckRunning = False , lastSanityCheck = Nothing - , pendingAdd = S.empty } getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus getDaemonStatus = liftIO . readMVar modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex () -modifyDaemonStatus handle a = modifyDaemonStatusM handle (return . a) - -modifyDaemonStatusM :: DaemonStatusHandle -> (DaemonStatus -> IO DaemonStatus) -> Annex () -modifyDaemonStatusM handle a = liftIO $ modifyMVar_ handle a +modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a) {- Load any previous daemon status file, and store it in the MVar for this - process to use as its DaemonStatus. -} diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index cb7ede920..db58f01e8 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -15,13 +15,14 @@ import Assistant.DaemonStatus import Assistant.Changes import Utility.DirWatcher import Utility.Types.DirWatcher +import qualified Annex import qualified Annex.Queue import qualified Git.Command import qualified Git.UpdateIndex import qualified Git.HashObject import qualified Git.LsFiles import qualified Backend -import qualified Annex +import qualified Command.Add import Annex.Content import Annex.CatFile import Git.Types @@ -110,22 +111,27 @@ runHandler st dstatus changechan handler file filestatus = void $ do - and only one has just closed it. We want to avoid adding a file to the - annex that is open for write, to avoid anything being able to change it. - - - We could run lsof on the file here to check for other writer. - - But, that's slow. Instead, a Change is returned that indicates this file - - still needs to be added. The committer will handle bundles of these - - Changes at once. + - We could run lsof on the file here to check for other writers. + - But, that's slow, and even if there is currently a writer, we will want + - to add the file *eventually*. Instead, the file is locked down as a hard + - link in a temp directory, with its write bits disabled, for later + - checking with lsof, and a Change is returned containing a KeySource + - using that hard link. The committer handles running lsof and finishing + - the add. -} onAdd :: Handler -onAdd file _filestatus dstatus = do - ifM (scanComplete <$> getDaemonStatus dstatus) - ( go - , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) - ( noChange - , go +onAdd file filestatus dstatus + | maybe False isRegularFile filestatus = do + ifM (scanComplete <$> getDaemonStatus dstatus) + ( go + , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) + ( noChange + , go + ) ) - ) + | otherwise = noChange where - go = madeChange file PendingAddChange + go = pendingAddChange =<< Command.Add.lockDown file {- A symlink might be an arbitrary symlink, which is just added. - Or, if it is a git-annex symlink, ensure it points to the content |