diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/Changes.hs | 18 | ||||
-rw-r--r-- | Assistant/Threads/Committer.hs | 146 | ||||
-rw-r--r-- | Assistant/Threads/SanityChecker.hs | 2 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 85 |
4 files changed, 121 insertions, 130 deletions
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs index eca922109..cccc372c1 100644 --- a/Assistant/Changes.hs +++ b/Assistant/Changes.hs @@ -27,6 +27,10 @@ data Change } | PendingAddChange { changeTime ::UTCTime + , changeFile :: FilePath + } + | InProcessAddChange + { changeTime ::UTCTime , keySource :: KeySource } deriving (Show) @@ -44,17 +48,21 @@ madeChange f t = do noChange :: Annex (Maybe Change) noChange = return Nothing -{- Indicates an add is in progress. -} -pendingAddChange :: KeySource -> Annex (Maybe Change) -pendingAddChange ks = - liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure ks) +{- Indicates an add needs to be done, but has not started yet. -} +pendingAddChange :: FilePath -> Annex (Maybe Change) +pendingAddChange f = + liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure f) isPendingAddChange :: Change -> Bool isPendingAddChange (PendingAddChange {}) = True isPendingAddChange _ = False +isInProcessAddChange :: Change -> Bool +isInProcessAddChange (InProcessAddChange {}) = True +isInProcessAddChange _ = False + finishedChange :: Change -> Change -finishedChange c@(PendingAddChange { keySource = ks }) = Change +finishedChange c@(InProcessAddChange { keySource = ks }) = Change { changeTime = changeTime c , changeFile = keyFilename ks , changeType = AddChange diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index 62058f4f4..ab70e02bb 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -5,6 +5,8 @@ - Licensed under the GNU GPL version 3 or higher. -} +{-# LANGUAGE CPP #-} + module Assistant.Threads.Committer where import Assistant.Common @@ -16,10 +18,10 @@ import Assistant.Threads.Watcher import Assistant.TransferQueue import Assistant.DaemonStatus import Logs.Transfer -import qualified Annex import qualified Annex.Queue import qualified Git.Command import qualified Git.HashObject +import qualified Git.LsFiles import qualified Git.Version import Git.Types import qualified Command.Add @@ -27,6 +29,7 @@ import Utility.ThreadScheduler import qualified Utility.Lsof as Lsof import qualified Utility.DirWatcher as DirWatcher import Types.KeySource +import Config import Data.Time.Clock import Data.Tuple.Utils @@ -38,28 +41,32 @@ thisThread = "Committer" {- This thread makes git commits at appropriate times. -} commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread -commitThread st changechan commitchan transferqueue dstatus = thread $ runEvery (Seconds 1) $ do - -- We already waited one second as a simple rate limiter. - -- Next, wait until at least one change is available for - -- processing. - changes <- getChanges changechan - -- Now see if now's a good time to commit. - time <- getCurrentTime - if shouldCommit time changes - then do - readychanges <- handleAdds st changechan transferqueue dstatus changes - if shouldCommit time readychanges - then do - debug thisThread - [ "committing" - , show (length readychanges) - , "changes" - ] - void $ alertWhile dstatus commitAlert $ - runThreadState st commitStaged - recordCommit commitchan (Commit time) - else refill readychanges - else refill changes +commitThread st changechan commitchan transferqueue dstatus = thread $ do + delayadd <- runThreadState st $ + maybe delayaddDefault (Just . Seconds) . readish + <$> getConfig (annexConfig "delayadd") "" + runEvery (Seconds 1) $ do + -- We already waited one second as a simple rate limiter. + -- Next, wait until at least one change is available for + -- processing. + changes <- getChanges changechan + -- Now see if now's a good time to commit. + time <- getCurrentTime + if shouldCommit time changes + then do + readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes + if shouldCommit time readychanges + then do + debug thisThread + [ "committing" + , show (length readychanges) + , "changes" + ] + void $ alertWhile dstatus commitAlert $ + runThreadState st commitStaged + recordCommit commitchan (Commit time) + else refill readychanges + else refill changes where thread = NamedThread thisThread refill [] = noop @@ -109,13 +116,24 @@ shouldCommit now changes len = length changes thisSecond c = now `diffUTCTime` changeTime c <= 1 -{- If there are PendingAddChanges, the files have not yet actually been - - added to the annex (probably), and that has to be done now, before - - committing. +{- OSX needs a short delay after a file is added before locking it down, + - as pasting a file seems to try to set file permissions or otherwise + - access the file after closing it. -} +delayaddDefault :: Maybe Seconds +#ifdef darwin_HOST_OS +delayaddDefault = Just $ Seconds 1 +#else +delayaddDefault = Nothing +#endif + +{- If there are PendingAddChanges, or InProcessAddChanges, the files + - have not yet actually been added to the annex, and that has to be done + - now, before committing. - - Deferring the adds to this point causes batches to be bundled together, - which allows faster checking with lsof that the files are not still open - - for write by some other process. + - for write by some other process, and faster checking with git-ls-files + - that the files are not already checked into git. - - When a file is added, Inotify will notice the new symlink. So this waits - for additional Changes to arrive, so that the symlink has hopefully been @@ -128,10 +146,11 @@ shouldCommit now changes - Any pending adds that are not ready yet are put back into the ChangeChan, - where they will be retried later. -} -handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change] -handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do - (postponed, toadd) <- partitionEithers <$> - safeToAdd st pendingadds +handleAdds :: Maybe Seconds -> ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change] +handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null incomplete) $ do + let (pending, inprocess) = partition isPendingAddChange incomplete + pending' <- findnew pending + (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd st pending' inprocess unless (null postponed) $ refillChanges changechan postponed @@ -141,18 +160,26 @@ handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds if DirWatcher.eventsCoalesce || null added then return $ added ++ otherchanges else do - r <- handleAdds st changechan transferqueue dstatus + r <- handleAdds delayadd st changechan transferqueue dstatus =<< getChanges changechan return $ r ++ added ++ otherchanges where - (pendingadds, otherchanges) = partition isPendingAddChange cs + (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs + + findnew [] = return [] + findnew pending = do + newfiles <- runThreadState st $ + inRepo (Git.LsFiles.notInRepo False $ map changeFile pending) + -- note: timestamp info is lost here + let ts = changeTime (pending !! 0) + return $ map (PendingAddChange ts) newfiles returnWhen c a | c = return otherchanges | otherwise = a add :: Change -> IO (Maybe Change) - add change@(PendingAddChange { keySource = ks }) = + add change@(InProcessAddChange { keySource = ks }) = alertWhile' dstatus (addFileAlert $ keyFilename ks) $ liftM ret $ catchMaybeIO $ sanitycheck ks $ runThreadState st $ do @@ -190,38 +217,43 @@ handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds then a else return Nothing -{- PendingAddChanges can Either be Right to be added now, +{- Files can Either be Right to be added now, - or are unsafe, and must be Left for later. - - Check by running lsof on the temp directory, which - the KeySources are locked down in. -} -safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change] -safeToAdd st changes = runThreadState st $ - ifM (Annex.getState Annex.force) - ( allRight changes -- force bypasses lsof check - , do - tmpdir <- fromRepo gitAnnexTmpDir - openfiles <- S.fromList . map fst3 . filter openwrite <$> - liftIO (Lsof.queryDir tmpdir) - - let checked = map (check openfiles) changes - - {- If new events are received when files are closed, - - there's no need to retry any changes that cannot - - be done now. -} - if DirWatcher.closingTracked - then do - mapM_ canceladd $ lefts checked - allRight $ rights checked - else return checked - ) +safeToAdd :: Maybe Seconds -> ThreadState -> [Change] -> [Change] -> IO [Either Change Change] +safeToAdd _ _ [] [] = return [] +safeToAdd delayadd st pending inprocess = do + maybe noop threadDelaySeconds delayadd + runThreadState st $ do + keysources <- mapM Command.Add.lockDown (map changeFile pending) + let inprocess' = map mkinprocess (zip pending keysources) + tmpdir <- fromRepo gitAnnexTmpDir + openfiles <- S.fromList . map fst3 . filter openwrite <$> + liftIO (Lsof.queryDir tmpdir) + let checked = map (check openfiles) $ inprocess ++ inprocess' + + {- If new events are received when files are closed, + - there's no need to retry any changes that cannot + - be done now. -} + if DirWatcher.closingTracked + then do + mapM_ canceladd $ lefts checked + allRight $ rights checked + else return checked where - check openfiles change@(PendingAddChange { keySource = ks }) + check openfiles change@(InProcessAddChange { keySource = ks }) | S.member (contentLocation ks) openfiles = Left change check _ change = Right change - canceladd (PendingAddChange { keySource = ks }) = do + mkinprocess (c, ks) = InProcessAddChange + { changeTime = changeTime c + , keySource = ks + } + + canceladd (InProcessAddChange { keySource = ks }) = do warning $ keyFilename ks ++ " still has writers, not adding" -- remove the hard link diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index b99d72802..148ae1435 100644 --- a/Assistant/Threads/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -93,7 +93,7 @@ check st dstatus transferqueue changechan = do runThreadState st $ warning msg void $ addAlert dstatus $ sanityCheckFixAlert msg addsymlink file s = do - Watcher.runHandler thisThread Nothing st dstatus + Watcher.runHandler thisThread st dstatus transferqueue changechan Watcher.onAddSymlink file s insanity $ "found unstaged symlink: " ++ file diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 9c3f4a941..1bf9e8581 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -5,8 +5,6 @@ - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE CPP #-} - module Assistant.Threads.Watcher ( watchThread, checkCanWatch, @@ -30,14 +28,10 @@ import qualified Annex.Queue import qualified Git.Command import qualified Git.UpdateIndex import qualified Git.HashObject -import qualified Git.LsFiles import qualified Backend -import qualified Command.Add import Annex.Content import Annex.CatFile import Git.Types -import Config -import Utility.ThreadScheduler import Data.Bits.Utils import qualified Data.ByteString.Lazy as L @@ -60,32 +54,19 @@ needLsof = error $ unlines , "Be warned: This can corrupt data in the annex, and make fsck complain." ] -{- OSX needs a short delay after a file is added before locking it down, - - as pasting a file seems to try to set file permissions or otherwise - - access the file after closing it. -} -delayaddDefault :: Maybe Seconds -#ifdef darwin_HOST_OS -delayaddDefault = Just $ Seconds 1 -#else -delayaddDefault = Nothing -#endif - watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do - delayadd <- runThreadState st $ - maybe delayaddDefault (Just . Seconds) . readish - <$> getConfig (annexConfig "delayadd") "" - void $ watchDir "." ignored (hooks delayadd) startup + void $ watchDir "." ignored hooks startup debug thisThread [ "watching", "."] where startup = startupScan st dstatus - hook delay a = Just $ runHandler thisThread delay st dstatus transferqueue changechan a - hooks delayadd = mkWatchHooks - { addHook = hook delayadd onAdd - , delHook = hook Nothing onDel - , addSymlinkHook = hook Nothing onAddSymlink - , delDirHook = hook Nothing onDelDir - , errHook = hook Nothing onErr + hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a + hooks = mkWatchHooks + { addHook = hook onAdd + , delHook = hook onDel + , addSymlinkHook = hook onAddSymlink + , delDirHook = hook onDelDir + , errHook = hook onErr } {- Initial scartup scan. The action should return once the scan is complete. -} @@ -113,65 +94,35 @@ ignored = ig . takeFileName ig ".gitattributes" = True ig _ = False -type Handler = ThreadName -> Maybe Seconds -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change) +type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change) {- Runs an action handler, inside the Annex monad, and if there was a - change, adds it to the ChangeChan. - - Exceptions are ignored, otherwise a whole watcher thread could be crashed. -} -runHandler :: ThreadName -> Maybe Seconds -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler threadname delay st dstatus transferqueue changechan handler file filestatus = void $ do +runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do r <- tryIO go case r of Left e -> print e Right Nothing -> noop Right (Just change) -> recordChange changechan change where - go = runThreadState st $ handler threadname delay file filestatus dstatus transferqueue + go = runThreadState st $ handler threadname file filestatus dstatus transferqueue -{- During initial directory scan, this will be run for any regular files - - that are already checked into git. We don't want to turn those into - - symlinks, so do a check. This is rather expensive, but only happens - - during startup. - - - - It's possible for the file to still be open for write by some process. - - This can happen in a few ways; one is if two processes had the file open - - and only one has just closed it. We want to avoid adding a file to the - - annex that is open for write, to avoid anything being able to change it. - - - - We could run lsof on the file here to check for other writers. - - But, that's slow, and even if there is currently a writer, we will want - - to add the file *eventually*. Instead, the file is locked down as a hard - - link in a temp directory, with its write bits disabled, for later - - checking with lsof, and a Change is returned containing a KeySource - - using that hard link. The committer handles running lsof and finishing - - the add. - -} onAdd :: Handler -onAdd threadname delay file filestatus dstatus _ - | maybe False isRegularFile filestatus = - ifM (scanComplete <$> liftIO (getDaemonStatus dstatus)) - ( go - , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) - ( noChange - , go - ) - ) +onAdd _ file filestatus _ _ + | maybe False isRegularFile filestatus = pendingAddChange file | otherwise = noChange where - go = do - liftIO $ do - debug threadname ["file added", file] - maybe noop threadDelaySeconds delay - pendingAddChange =<< Command.Add.lockDown file {- A symlink might be an arbitrary symlink, which is just added. - Or, if it is a git-annex symlink, ensure it points to the content - before adding it. -} onAddSymlink :: Handler -onAddSymlink threadname _ file filestatus dstatus transferqueue = go =<< Backend.lookupFile file +onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file where go (Just (key, _)) = do link <- calcGitLink file key @@ -232,7 +183,7 @@ onAddSymlink threadname _ file filestatus dstatus transferqueue = go =<< Backend | otherwise = noop onDel :: Handler -onDel threadname _ file _ _dstatus _ = do +onDel threadname file _ _dstatus _ = do liftIO $ debug threadname ["file deleted", file] Annex.Queue.addUpdateIndex =<< inRepo (Git.UpdateIndex.unstageFile file) @@ -246,7 +197,7 @@ onDel threadname _ file _ _dstatus _ = do - command to get the recursive list of files in the directory, so rm is - just as good. -} onDelDir :: Handler -onDelDir threadname _ dir _ _dstatus _ = do +onDelDir threadname dir _ _dstatus _ = do liftIO $ debug threadname ["directory deleted", dir] Annex.Queue.addCommand "rm" [Params "--quiet -r --cached --ignore-unmatch --"] [dir] @@ -254,7 +205,7 @@ onDelDir threadname _ dir _ _dstatus _ = do {- Called when there's an error with inotify or kqueue. -} onErr :: Handler -onErr _ _ msg _ dstatus _ = do +onErr _ msg _ dstatus _ = do warning msg void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg return Nothing |