summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Changes.hs18
-rw-r--r--Assistant/Threads/Committer.hs146
-rw-r--r--Assistant/Threads/SanityChecker.hs2
-rw-r--r--Assistant/Threads/Watcher.hs85
4 files changed, 121 insertions, 130 deletions
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index eca922109..cccc372c1 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -27,6 +27,10 @@ data Change
}
| PendingAddChange
{ changeTime ::UTCTime
+ , changeFile :: FilePath
+ }
+ | InProcessAddChange
+ { changeTime ::UTCTime
, keySource :: KeySource
}
deriving (Show)
@@ -44,17 +48,21 @@ madeChange f t = do
noChange :: Annex (Maybe Change)
noChange = return Nothing
-{- Indicates an add is in progress. -}
-pendingAddChange :: KeySource -> Annex (Maybe Change)
-pendingAddChange ks =
- liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure ks)
+{- Indicates an add needs to be done, but has not started yet. -}
+pendingAddChange :: FilePath -> Annex (Maybe Change)
+pendingAddChange f =
+ liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure f)
isPendingAddChange :: Change -> Bool
isPendingAddChange (PendingAddChange {}) = True
isPendingAddChange _ = False
+isInProcessAddChange :: Change -> Bool
+isInProcessAddChange (InProcessAddChange {}) = True
+isInProcessAddChange _ = False
+
finishedChange :: Change -> Change
-finishedChange c@(PendingAddChange { keySource = ks }) = Change
+finishedChange c@(InProcessAddChange { keySource = ks }) = Change
{ changeTime = changeTime c
, changeFile = keyFilename ks
, changeType = AddChange
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index 62058f4f4..ab70e02bb 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -5,6 +5,8 @@
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE CPP #-}
+
module Assistant.Threads.Committer where
import Assistant.Common
@@ -16,10 +18,10 @@ import Assistant.Threads.Watcher
import Assistant.TransferQueue
import Assistant.DaemonStatus
import Logs.Transfer
-import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
import qualified Git.HashObject
+import qualified Git.LsFiles
import qualified Git.Version
import Git.Types
import qualified Command.Add
@@ -27,6 +29,7 @@ import Utility.ThreadScheduler
import qualified Utility.Lsof as Lsof
import qualified Utility.DirWatcher as DirWatcher
import Types.KeySource
+import Config
import Data.Time.Clock
import Data.Tuple.Utils
@@ -38,28 +41,32 @@ thisThread = "Committer"
{- This thread makes git commits at appropriate times. -}
commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread
-commitThread st changechan commitchan transferqueue dstatus = thread $ runEvery (Seconds 1) $ do
- -- We already waited one second as a simple rate limiter.
- -- Next, wait until at least one change is available for
- -- processing.
- changes <- getChanges changechan
- -- Now see if now's a good time to commit.
- time <- getCurrentTime
- if shouldCommit time changes
- then do
- readychanges <- handleAdds st changechan transferqueue dstatus changes
- if shouldCommit time readychanges
- then do
- debug thisThread
- [ "committing"
- , show (length readychanges)
- , "changes"
- ]
- void $ alertWhile dstatus commitAlert $
- runThreadState st commitStaged
- recordCommit commitchan (Commit time)
- else refill readychanges
- else refill changes
+commitThread st changechan commitchan transferqueue dstatus = thread $ do
+ delayadd <- runThreadState st $
+ maybe delayaddDefault (Just . Seconds) . readish
+ <$> getConfig (annexConfig "delayadd") ""
+ runEvery (Seconds 1) $ do
+ -- We already waited one second as a simple rate limiter.
+ -- Next, wait until at least one change is available for
+ -- processing.
+ changes <- getChanges changechan
+ -- Now see if now's a good time to commit.
+ time <- getCurrentTime
+ if shouldCommit time changes
+ then do
+ readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes
+ if shouldCommit time readychanges
+ then do
+ debug thisThread
+ [ "committing"
+ , show (length readychanges)
+ , "changes"
+ ]
+ void $ alertWhile dstatus commitAlert $
+ runThreadState st commitStaged
+ recordCommit commitchan (Commit time)
+ else refill readychanges
+ else refill changes
where
thread = NamedThread thisThread
refill [] = noop
@@ -109,13 +116,24 @@ shouldCommit now changes
len = length changes
thisSecond c = now `diffUTCTime` changeTime c <= 1
-{- If there are PendingAddChanges, the files have not yet actually been
- - added to the annex (probably), and that has to be done now, before
- - committing.
+{- OSX needs a short delay after a file is added before locking it down,
+ - as pasting a file seems to try to set file permissions or otherwise
+ - access the file after closing it. -}
+delayaddDefault :: Maybe Seconds
+#ifdef darwin_HOST_OS
+delayaddDefault = Just $ Seconds 1
+#else
+delayaddDefault = Nothing
+#endif
+
+{- If there are PendingAddChanges, or InProcessAddChanges, the files
+ - have not yet actually been added to the annex, and that has to be done
+ - now, before committing.
-
- Deferring the adds to this point causes batches to be bundled together,
- which allows faster checking with lsof that the files are not still open
- - for write by some other process.
+ - for write by some other process, and faster checking with git-ls-files
+ - that the files are not already checked into git.
-
- When a file is added, Inotify will notice the new symlink. So this waits
- for additional Changes to arrive, so that the symlink has hopefully been
@@ -128,10 +146,11 @@ shouldCommit now changes
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
-handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
-handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
- (postponed, toadd) <- partitionEithers <$>
- safeToAdd st pendingadds
+handleAdds :: Maybe Seconds -> ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
+handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null incomplete) $ do
+ let (pending, inprocess) = partition isPendingAddChange incomplete
+ pending' <- findnew pending
+ (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd st pending' inprocess
unless (null postponed) $
refillChanges changechan postponed
@@ -141,18 +160,26 @@ handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds
if DirWatcher.eventsCoalesce || null added
then return $ added ++ otherchanges
else do
- r <- handleAdds st changechan transferqueue dstatus
+ r <- handleAdds delayadd st changechan transferqueue dstatus
=<< getChanges changechan
return $ r ++ added ++ otherchanges
where
- (pendingadds, otherchanges) = partition isPendingAddChange cs
+ (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs
+
+ findnew [] = return []
+ findnew pending = do
+ newfiles <- runThreadState st $
+ inRepo (Git.LsFiles.notInRepo False $ map changeFile pending)
+ -- note: timestamp info is lost here
+ let ts = changeTime (pending !! 0)
+ return $ map (PendingAddChange ts) newfiles
returnWhen c a
| c = return otherchanges
| otherwise = a
add :: Change -> IO (Maybe Change)
- add change@(PendingAddChange { keySource = ks }) =
+ add change@(InProcessAddChange { keySource = ks }) =
alertWhile' dstatus (addFileAlert $ keyFilename ks) $
liftM ret $ catchMaybeIO $
sanitycheck ks $ runThreadState st $ do
@@ -190,38 +217,43 @@ handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds
then a
else return Nothing
-{- PendingAddChanges can Either be Right to be added now,
+{- Files can Either be Right to be added now,
- or are unsafe, and must be Left for later.
-
- Check by running lsof on the temp directory, which
- the KeySources are locked down in.
-}
-safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change]
-safeToAdd st changes = runThreadState st $
- ifM (Annex.getState Annex.force)
- ( allRight changes -- force bypasses lsof check
- , do
- tmpdir <- fromRepo gitAnnexTmpDir
- openfiles <- S.fromList . map fst3 . filter openwrite <$>
- liftIO (Lsof.queryDir tmpdir)
-
- let checked = map (check openfiles) changes
-
- {- If new events are received when files are closed,
- - there's no need to retry any changes that cannot
- - be done now. -}
- if DirWatcher.closingTracked
- then do
- mapM_ canceladd $ lefts checked
- allRight $ rights checked
- else return checked
- )
+safeToAdd :: Maybe Seconds -> ThreadState -> [Change] -> [Change] -> IO [Either Change Change]
+safeToAdd _ _ [] [] = return []
+safeToAdd delayadd st pending inprocess = do
+ maybe noop threadDelaySeconds delayadd
+ runThreadState st $ do
+ keysources <- mapM Command.Add.lockDown (map changeFile pending)
+ let inprocess' = map mkinprocess (zip pending keysources)
+ tmpdir <- fromRepo gitAnnexTmpDir
+ openfiles <- S.fromList . map fst3 . filter openwrite <$>
+ liftIO (Lsof.queryDir tmpdir)
+ let checked = map (check openfiles) $ inprocess ++ inprocess'
+
+ {- If new events are received when files are closed,
+ - there's no need to retry any changes that cannot
+ - be done now. -}
+ if DirWatcher.closingTracked
+ then do
+ mapM_ canceladd $ lefts checked
+ allRight $ rights checked
+ else return checked
where
- check openfiles change@(PendingAddChange { keySource = ks })
+ check openfiles change@(InProcessAddChange { keySource = ks })
| S.member (contentLocation ks) openfiles = Left change
check _ change = Right change
- canceladd (PendingAddChange { keySource = ks }) = do
+ mkinprocess (c, ks) = InProcessAddChange
+ { changeTime = changeTime c
+ , keySource = ks
+ }
+
+ canceladd (InProcessAddChange { keySource = ks }) = do
warning $ keyFilename ks
++ " still has writers, not adding"
-- remove the hard link
diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
index b99d72802..148ae1435 100644
--- a/Assistant/Threads/SanityChecker.hs
+++ b/Assistant/Threads/SanityChecker.hs
@@ -93,7 +93,7 @@ check st dstatus transferqueue changechan = do
runThreadState st $ warning msg
void $ addAlert dstatus $ sanityCheckFixAlert msg
addsymlink file s = do
- Watcher.runHandler thisThread Nothing st dstatus
+ Watcher.runHandler thisThread st dstatus
transferqueue changechan
Watcher.onAddSymlink file s
insanity $ "found unstaged symlink: " ++ file
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index 9c3f4a941..1bf9e8581 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -5,8 +5,6 @@
- Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE CPP #-}
-
module Assistant.Threads.Watcher (
watchThread,
checkCanWatch,
@@ -30,14 +28,10 @@ import qualified Annex.Queue
import qualified Git.Command
import qualified Git.UpdateIndex
import qualified Git.HashObject
-import qualified Git.LsFiles
import qualified Backend
-import qualified Command.Add
import Annex.Content
import Annex.CatFile
import Git.Types
-import Config
-import Utility.ThreadScheduler
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
@@ -60,32 +54,19 @@ needLsof = error $ unlines
, "Be warned: This can corrupt data in the annex, and make fsck complain."
]
-{- OSX needs a short delay after a file is added before locking it down,
- - as pasting a file seems to try to set file permissions or otherwise
- - access the file after closing it. -}
-delayaddDefault :: Maybe Seconds
-#ifdef darwin_HOST_OS
-delayaddDefault = Just $ Seconds 1
-#else
-delayaddDefault = Nothing
-#endif
-
watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread
watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do
- delayadd <- runThreadState st $
- maybe delayaddDefault (Just . Seconds) . readish
- <$> getConfig (annexConfig "delayadd") ""
- void $ watchDir "." ignored (hooks delayadd) startup
+ void $ watchDir "." ignored hooks startup
debug thisThread [ "watching", "."]
where
startup = startupScan st dstatus
- hook delay a = Just $ runHandler thisThread delay st dstatus transferqueue changechan a
- hooks delayadd = mkWatchHooks
- { addHook = hook delayadd onAdd
- , delHook = hook Nothing onDel
- , addSymlinkHook = hook Nothing onAddSymlink
- , delDirHook = hook Nothing onDelDir
- , errHook = hook Nothing onErr
+ hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a
+ hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , delHook = hook onDel
+ , addSymlinkHook = hook onAddSymlink
+ , delDirHook = hook onDelDir
+ , errHook = hook onErr
}
{- Initial scartup scan. The action should return once the scan is complete. -}
@@ -113,65 +94,35 @@ ignored = ig . takeFileName
ig ".gitattributes" = True
ig _ = False
-type Handler = ThreadName -> Maybe Seconds -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change)
+type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change)
{- Runs an action handler, inside the Annex monad, and if there was a
- change, adds it to the ChangeChan.
-
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
-}
-runHandler :: ThreadName -> Maybe Seconds -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
-runHandler threadname delay st dstatus transferqueue changechan handler file filestatus = void $ do
+runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do
r <- tryIO go
case r of
Left e -> print e
Right Nothing -> noop
Right (Just change) -> recordChange changechan change
where
- go = runThreadState st $ handler threadname delay file filestatus dstatus transferqueue
+ go = runThreadState st $ handler threadname file filestatus dstatus transferqueue
-{- During initial directory scan, this will be run for any regular files
- - that are already checked into git. We don't want to turn those into
- - symlinks, so do a check. This is rather expensive, but only happens
- - during startup.
- -
- - It's possible for the file to still be open for write by some process.
- - This can happen in a few ways; one is if two processes had the file open
- - and only one has just closed it. We want to avoid adding a file to the
- - annex that is open for write, to avoid anything being able to change it.
- -
- - We could run lsof on the file here to check for other writers.
- - But, that's slow, and even if there is currently a writer, we will want
- - to add the file *eventually*. Instead, the file is locked down as a hard
- - link in a temp directory, with its write bits disabled, for later
- - checking with lsof, and a Change is returned containing a KeySource
- - using that hard link. The committer handles running lsof and finishing
- - the add.
- -}
onAdd :: Handler
-onAdd threadname delay file filestatus dstatus _
- | maybe False isRegularFile filestatus =
- ifM (scanComplete <$> liftIO (getDaemonStatus dstatus))
- ( go
- , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
- ( noChange
- , go
- )
- )
+onAdd _ file filestatus _ _
+ | maybe False isRegularFile filestatus = pendingAddChange file
| otherwise = noChange
where
- go = do
- liftIO $ do
- debug threadname ["file added", file]
- maybe noop threadDelaySeconds delay
- pendingAddChange =<< Command.Add.lockDown file
{- A symlink might be an arbitrary symlink, which is just added.
- Or, if it is a git-annex symlink, ensure it points to the content
- before adding it.
-}
onAddSymlink :: Handler
-onAddSymlink threadname _ file filestatus dstatus transferqueue = go =<< Backend.lookupFile file
+onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file
where
go (Just (key, _)) = do
link <- calcGitLink file key
@@ -232,7 +183,7 @@ onAddSymlink threadname _ file filestatus dstatus transferqueue = go =<< Backend
| otherwise = noop
onDel :: Handler
-onDel threadname _ file _ _dstatus _ = do
+onDel threadname file _ _dstatus _ = do
liftIO $ debug threadname ["file deleted", file]
Annex.Queue.addUpdateIndex =<<
inRepo (Git.UpdateIndex.unstageFile file)
@@ -246,7 +197,7 @@ onDel threadname _ file _ _dstatus _ = do
- command to get the recursive list of files in the directory, so rm is
- just as good. -}
onDelDir :: Handler
-onDelDir threadname _ dir _ _dstatus _ = do
+onDelDir threadname dir _ _dstatus _ = do
liftIO $ debug threadname ["directory deleted", dir]
Annex.Queue.addCommand "rm"
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
@@ -254,7 +205,7 @@ onDelDir threadname _ dir _ _dstatus _ = do
{- Called when there's an error with inotify or kqueue. -}
onErr :: Handler
-onErr _ _ msg _ dstatus _ = do
+onErr _ msg _ dstatus _ = do
warning msg
void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg
return Nothing