diff options
author | Joey Hess <joey@kitenet.net> | 2012-06-13 12:36:33 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-06-13 12:46:39 -0400 |
commit | ccc50052453ccaf2db0c371c5c36b5eea3e9191a (patch) | |
tree | dfb2ce0b7bfed5c8490cda68b6cd733bef72c473 | |
parent | c31ddeda84542414dd58e03473a23a6de8890390 (diff) |
reorganize
-rw-r--r-- | Assistant.hs | 74 | ||||
-rw-r--r-- | Assistant/Committer.hs | 104 | ||||
-rw-r--r-- | Assistant/DaemonStatus.hs | 35 | ||||
-rw-r--r-- | Assistant/ThreadedMonad.hs | 40 | ||||
-rw-r--r-- | Assistant/Watcher.hs | 206 | ||||
-rw-r--r-- | Command/Watch.hs | 386 |
6 files changed, 463 insertions, 382 deletions
diff --git a/Assistant.hs b/Assistant.hs new file mode 100644 index 000000000..bc394bd99 --- /dev/null +++ b/Assistant.hs @@ -0,0 +1,74 @@ +{- git-annex assistant daemon + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + - + - Overview of threads and MVars, etc: + - + - Thread 1: parent + - The initial thread run, double forks to background, starts other + - threads, and then stops, waiting for them to terminate, + - or for a ctrl-c. + - Thread 2: inotify + - Notices new files, and calls handlers for events, queuing changes. + - Thread 3: inotify internal + - Used by haskell inotify library to ensure inotify event buffer is + - kept drained. + - Thread 4: inotify initial scan + - A MVar lock is used to prevent other inotify handlers from running + - until this is complete. + - Thread 5: committer + - Waits for changes to occur, and runs the git queue to update its + - index, then commits. + - Thread 6: status logger + - Wakes up periodically and records the daemon's status to disk. + - + - ThreadState: (MVar) + - The Annex state is stored here, which allows resuscitating the + - Annex monad in IO actions run by the inotify and committer + - threads. Thus, a single state is shared amoung the threads, and + - only one at a time can access it. + - DaemonStatusHandle: (MVar) + - The daemon's current status. This MVar should only be manipulated + - from inside the Annex monad, which ensures it's accessed only + - after the ThreadState MVar. + - ChangeChan: (STM TChan) + - Changes are indicated by writing to this channel. The committer + - reads from it. + -} + +module Assistant where + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.Watcher +import Assistant.Committer +import qualified Utility.Daemon +import Utility.LogFile + +import Control.Concurrent + +startDaemon :: Bool -> Annex () +startDaemon foreground + | foreground = do + showStart "watch" "." + go id + | otherwise = do + logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile + pidfile <- fromRepo gitAnnexPidFile + go $ Utility.Daemon.daemonize logfd (Just pidfile) False + where + go a = withThreadState $ \st -> liftIO $ a $ do + dstatus <- startDaemonStatus + changechan <- newChangeChan + -- The commit thread is started early, so that the user + -- can immediately begin adding files and having them + -- committed, even while the startup scan is taking + -- place. + _ <- forkIO $ commitThread st changechan + watchThread st dstatus changechan + +stopDaemon :: Annex () +stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs new file mode 100644 index 000000000..d6fc08579 --- /dev/null +++ b/Assistant/Committer.hs @@ -0,0 +1,104 @@ +{- git-annex assistant change tracking and committing + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Assistant.Committer where + +import Common.Annex +import Assistant.ThreadedMonad +import qualified Annex.Queue +import qualified Git.Command + +import Control.Concurrent +import Control.Concurrent.STM +import Data.Time.Clock + +type ChangeChan = TChan Change + +data Change = Change + { changeTime :: UTCTime + , changeFile :: FilePath + , changeDesc :: String + } + deriving (Show) + +runChangeChan :: STM a -> IO a +runChangeChan = atomically + +newChangeChan :: IO ChangeChan +newChangeChan = atomically newTChan + +{- Handlers call this when they made a change that needs to get committed. -} +madeChange :: FilePath -> String -> Annex (Maybe Change) +madeChange file desc = do + -- Just in case the commit thread is not flushing the queue fast enough. + Annex.Queue.flushWhenFull + liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc) + +noChange :: Annex (Maybe Change) +noChange = return Nothing + +{- Gets all unhandled changes. + - Blocks until at least one change is made. -} +getChanges :: ChangeChan -> IO [Change] +getChanges chan = runChangeChan $ do + c <- readTChan chan + go [c] + where + go l = do + v <- tryReadTChan chan + case v of + Nothing -> return l + Just c -> go (c:l) + +{- Puts unhandled changes back into the channel. + - Note: Original order is not preserved. -} +refillChanges :: ChangeChan -> [Change] -> IO () +refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs + +{- This thread makes git commits at appropriate times. -} +commitThread :: ThreadState -> ChangeChan -> IO () +commitThread st changechan = forever $ do + -- First, a simple rate limiter. + threadDelay oneSecond + -- Next, wait until at least one change has been made. + cs <- getChanges changechan + -- Now see if now's a good time to commit. + time <- getCurrentTime + if shouldCommit time cs + then void $ tryIO $ runThreadState st commitStaged + else refillChanges changechan cs + where + oneSecond = 1000000 -- microseconds + +commitStaged :: Annex () +commitStaged = do + Annex.Queue.flush + inRepo $ Git.Command.run "commit" + [ Param "--allow-empty-message" + , Param "-m", Param "" + -- Empty commits may be made if tree changes cancel + -- each other out, etc + , Param "--allow-empty" + -- Avoid running the usual git-annex pre-commit hook; + -- watch does the same symlink fixing, and we don't want + -- to deal with unlocked files in these commits. + , Param "--quiet" + ] + +{- Decide if now is a good time to make a commit. + - Note that the list of change times has an undefined order. + - + - Current strategy: If there have been 10 commits within the past second, + - a batch activity is taking place, so wait for later. + -} +shouldCommit :: UTCTime -> [Change] -> Bool +shouldCommit now changes + | len == 0 = False + | len > 10000 = True -- avoid bloating queue too much + | length (filter thisSecond changes) < 10 = True + | otherwise = False -- batch activity + where + len = length changes + thisSecond c = now `diffUTCTime` changeTime c <= 1 diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs new file mode 100644 index 000000000..43a8c7e2f --- /dev/null +++ b/Assistant/DaemonStatus.hs @@ -0,0 +1,35 @@ +{- git-annex assistant daemon status + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Assistant.DaemonStatus where + +import Common.Annex + +import Control.Concurrent +import System.Posix.Types + +data DaemonStatus = DaemonStatus + -- False when the daemon is performing its startup scan + { scanComplete :: Bool + -- Time when a previous process of the daemon was running ok + , lastRunning :: Maybe EpochTime + } + +type DaemonStatusHandle = MVar DaemonStatus + +newDaemonStatus :: DaemonStatus +newDaemonStatus = DaemonStatus + { scanComplete = False + , lastRunning = Nothing + } + +startDaemonStatus :: IO DaemonStatusHandle +startDaemonStatus = newMVar newDaemonStatus + +getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus +getDaemonStatus = liftIO . readMVar + +modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex () +modifyDaemonStatus status a = liftIO $ modifyMVar_ status (return . a) diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs new file mode 100644 index 000000000..c4d331f61 --- /dev/null +++ b/Assistant/ThreadedMonad.hs @@ -0,0 +1,40 @@ +{- making the Annex monad available across threads + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +{-# LANGUAGE BangPatterns #-} + +module Assistant.ThreadedMonad where + +import Common.Annex +import qualified Annex + +import Control.Concurrent + +{- The Annex state is stored in a MVar, so that threaded actions can access + - it. -} +type ThreadState = MVar Annex.AnnexState + +{- Stores the Annex state in a MVar. + - + - Once the action is finished, retrieves the state from the MVar. + -} +withThreadState :: (ThreadState -> Annex a) -> Annex a +withThreadState a = do + state <- Annex.getState id + mvar <- liftIO $ newMVar state + r <- a mvar + newstate <- liftIO $ takeMVar mvar + Annex.changeState (const newstate) + return r + +{- Runs an Annex action, using the state from the MVar. + - + - This serializes calls by threads. -} +runThreadState :: ThreadState -> Annex a -> IO a +runThreadState mvar a = do + startstate <- takeMVar mvar + !(r, newstate) <- Annex.run startstate a + putMVar mvar newstate + return r diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs new file mode 100644 index 000000000..19a65db6e --- /dev/null +++ b/Assistant/Watcher.hs @@ -0,0 +1,206 @@ +{- git-annex assistant tree watcher + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Watcher where + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.Committer +import Utility.ThreadLock +import qualified Annex.Queue +import qualified Command.Add +import qualified Git.Command +import qualified Git.UpdateIndex +import qualified Git.HashObject +import qualified Git.LsFiles +import qualified Backend +import Annex.Content +import Annex.CatFile +import Git.Types + +import Control.Concurrent.STM +import Data.Bits.Utils +import qualified Data.ByteString.Lazy as L + +#if defined linux_HOST_OS +import Utility.Inotify +import System.INotify +#endif + +type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change) + +watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () +#if defined linux_HOST_OS +watchThread st dstatus changechan = withINotify $ \i -> do + runThreadState st $ + showAction "scanning" + -- This does not return until the startup scan is done. + -- That can take some time for large trees. + watchDir i "." (ignored . takeFileName) hooks + runThreadState st $ + modifyDaemonStatus dstatus $ \s -> s { scanComplete = True } + -- Notice any files that were deleted before inotify + -- was started. + runThreadState st $ do + inRepo $ Git.Command.run "add" [Param "--update"] + showAction "started" + waitForTermination + where + hook a = Just $ runHandler st dstatus changechan a + hooks = WatchHooks + { addHook = hook onAdd + , delHook = hook onDel + , addSymlinkHook = hook onAddSymlink + , delDirHook = hook onDelDir + , errHook = hook onErr + } +#else +watchThread = error "so far only available on Linux" +#endif + +ignored :: FilePath -> Bool +ignored ".git" = True +ignored ".gitignore" = True +ignored ".gitattributes" = True +ignored _ = False + +{- Runs an action handler, inside the Annex monad, and if there was a + - change, adds it to the ChangeChan. + - + - Exceptions are ignored, otherwise a whole watcher thread could be crashed. + -} +runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler st dstatus changechan handler file filestatus = void $ do + r <- tryIO go + case r of + Left e -> print e + Right Nothing -> noop + Right (Just change) -> void $ + runChangeChan $ writeTChan changechan change + where + go = runThreadState st $ handler file filestatus dstatus + +{- Adding a file is tricky; the file has to be replaced with a symlink + - but this is race prone, as the symlink could be changed immediately + - after creation. To avoid that race, git add is not used to stage the + - symlink. + - + - Inotify will notice the new symlink, so this Handler does not stage it + - or return a Change, leaving that to onAddSymlink. + - + - During initial directory scan, this will be run for any files that + - are already checked into git. We don't want to turn those into symlinks, + - so do a check. This is rather expensive, but only happens during + - startup. + -} +onAdd :: Handler +onAdd file _filestatus dstatus = do + ifM (scanComplete <$> getDaemonStatus dstatus) + ( go + , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) + ( noChange + , go + ) + ) + where + go = do + showStart "add" file + handle =<< Command.Add.ingest file + noChange + handle Nothing = showEndFail + handle (Just key) = do + Command.Add.link file key True + showEndOk + +{- A symlink might be an arbitrary symlink, which is just added. + - Or, if it is a git-annex symlink, ensure it points to the content + - before adding it. + -} +onAddSymlink :: Handler +onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file + where + go (Just (key, _)) = do + link <- calcGitLink file key + ifM ((==) link <$> liftIO (readSymbolicLink file)) + ( ensurestaged link =<< getDaemonStatus dstatus + , do + liftIO $ removeFile file + liftIO $ createSymbolicLink link file + addlink link + ) + go Nothing = do -- other symlink + link <- liftIO (readSymbolicLink file) + ensurestaged link =<< getDaemonStatus dstatus + + {- This is often called on symlinks that are already + - staged correctly. A symlink may have been deleted + - and being re-added, or added when the watcher was + - not running. So they're normally restaged to make sure. + - + - As an optimisation, during the status scan, avoid + - restaging everything. Only links that were created since + - the last time the daemon was running are staged. + - (If the daemon has never ran before, avoid staging + - links too.) + -} + ensurestaged link daemonstatus + | scanComplete daemonstatus = addlink link + | otherwise = case filestatus of + Just s + | safe (statusChangeTime s) -> noChange + _ -> addlink link + where + safe t = maybe True (> t) (lastRunning daemonstatus) + + {- For speed, tries to reuse the existing blob for + - the symlink target. -} + addlink link = do + v <- catObjectDetails $ Ref $ ':':file + case v of + Just (currlink, sha) + | s2w8 link == L.unpack currlink -> + stageSymlink file sha + _ -> do + sha <- inRepo $ + Git.HashObject.hashObject BlobObject link + stageSymlink file sha + madeChange file "link" + +onDel :: Handler +onDel file _ _dstatus = do + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.unstageFile file) + madeChange file "rm" + +{- A directory has been deleted, or moved, so tell git to remove anything + - that was inside it from its cache. Since it could reappear at any time, + - use --cached to only delete it from the index. + - + - Note: This could use unstageFile, but would need to run another git + - command to get the recursive list of files in the directory, so rm is + - just as good. -} +onDelDir :: Handler +onDelDir dir _ _dstatus = do + Annex.Queue.addCommand "rm" + [Params "--quiet -r --cached --ignore-unmatch --"] [dir] + madeChange dir "rmdir" + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr msg _ _dstatus = do + warning msg + return Nothing + +{- Adds a symlink to the index, without ever accessing the actual symlink + - on disk. -} +stageSymlink :: FilePath -> Sha -> Annex () +stageSymlink file sha = + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.stageSymlink file sha) diff --git a/Command/Watch.hs b/Command/Watch.hs index 54be556c9..5681b3861 100644 --- a/Command/Watch.hs +++ b/Command/Watch.hs @@ -1,108 +1,20 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE BangPatterns #-} -{- git-annex watch daemon +{- git-annex watch command - - Copyright 2012 Joey Hess <joey@kitenet.net> - - Licensed under the GNU GPL version 3 or higher. - - - - Overview of threads and MVars, etc: - - - - Thread 1: parent - - The initial thread run, double forks to background, starts other - - threads, and then stops, waiting for them to terminate, - - or for a ctrl-c. - - Thread 2: inotify - - Notices new files, and calls handlers for events, queuing changes. - - Thread 3: inotify internal - - Used by haskell inotify library to ensure inotify event buffer is - - kept drained. - - Thread 4: inotify initial scan - - A MVar lock is used to prevent other inotify handlers from running - - until this is complete. - - Thread 5: committer - - Waits for changes to occur, and runs the git queue to update its - - index, then commits. - - Thread 6: status logger - - Wakes up periodically and records the daemon's status to disk. - - - - State MVar: - - The Annex state is stored here, which allows resuscitating the - - Annex monad in IO actions run by the inotify and committer - - threads. Thus, a single state is shared amoung the threads, and - - only one at a time can access it. - - DaemonStatus MVar: - - The daemon's current status. This MVar should only be manipulated - - from inside the Annex monad, which ensures it's accessed only - - after the State MVar. - - ChangeChan STM TChan: - - Changes are indicated by writing to this channel. The committer - - reads from it. -} module Command.Watch where import Common.Annex +import Assistant import Command -import Utility.Daemon -import Utility.LogFile -import Utility.ThreadLock -import qualified Annex -import qualified Annex.Queue -import qualified Command.Add -import qualified Git.Command -import qualified Git.UpdateIndex -import qualified Git.HashObject -import qualified Git.LsFiles -import qualified Backend -import Annex.Content -import Annex.CatFile -import Git.Types import Option -import Control.Concurrent -import Control.Concurrent.STM -import Data.Time.Clock -import Data.Bits.Utils -import System.Posix.Types -import qualified Data.ByteString.Lazy as L - -#if defined linux_HOST_OS -import Utility.Inotify -import System.INotify -#endif - -data DaemonStatus = DaemonStatus - -- False when the daemon is performing its startup scan - { scanComplete :: Bool - -- Time when a previous process of the daemon was running ok - , lastRunning :: Maybe EpochTime - } - -newDaemonStatus :: Annex DaemonStatus -newDaemonStatus = return $ DaemonStatus - { scanComplete = False - , lastRunning = Nothing - } - -getDaemonStatus :: MVar DaemonStatus -> Annex DaemonStatus -getDaemonStatus = liftIO . readMVar - -modifyDaemonStatus :: MVar DaemonStatus -> (DaemonStatus -> DaemonStatus) -> Annex () -modifyDaemonStatus status a = liftIO $ modifyMVar_ status (return . a) - -type ChangeChan = TChan Change - -type Handler = FilePath -> Maybe FileStatus -> MVar DaemonStatus -> Annex (Maybe Change) - -data Change = Change - { changeTime :: UTCTime - , changeFile :: FilePath - , changeDesc :: String - } - deriving (Show) - def :: [Command] def = [withOptions [foregroundOption, stopOption] $ command "watch" paramNothing seek "watch for changes"] @@ -121,296 +33,6 @@ stopOption = Option.flag [] "stop" "stop daemon" start :: Bool -> Bool -> CommandStart start foreground stopdaemon = notBareRepo $ do if stopdaemon - then liftIO . stopDaemon =<< fromRepo gitAnnexPidFile - else withStateMVar $ startDaemon foreground + then stopDaemon + else startDaemon foreground -- does not return stop - -startDaemon :: Bool -> MVar Annex.AnnexState -> Annex () -startDaemon foreground st - | foreground = do - showStart "watch" "." - go id - | otherwise = do - logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile - pidfile <- fromRepo gitAnnexPidFile - go $ daemonize logfd (Just pidfile) False - where - go a = do - daemonstatus <- newDaemonStatus - liftIO $ a $ do - dstatus <- newMVar daemonstatus - changechan <- runChangeChan newTChan - watch st dstatus changechan - -watch :: MVar Annex.AnnexState -> MVar DaemonStatus -> ChangeChan -> IO () -#if defined linux_HOST_OS -watch st dstatus changechan = withINotify $ \i -> do - -- The commit thread is started early, so that the user - -- can immediately begin adding files and having them - -- committed, even while the startup scan is taking place. - _ <- forkIO $ commitThread st changechan - runStateMVar st $ - showAction "scanning" - -- This does not return until the startup scan is done. - -- That can take some time for large trees. - watchDir i "." (ignored . takeFileName) hooks - runStateMVar st $ - modifyDaemonStatus dstatus $ \s -> s { scanComplete = True } - -- Notice any files that were deleted before inotify - -- was started. - runStateMVar st $ do - inRepo $ Git.Command.run "add" [Param "--update"] - showAction "started" - waitForTermination - where - hook a = Just $ runHandler st dstatus changechan a - hooks = WatchHooks - { addHook = hook onAdd - , delHook = hook onDel - , addSymlinkHook = hook onAddSymlink - , delDirHook = hook onDelDir - , errHook = hook onErr - } -#else -watch = error "watch mode is so far only available on Linux" -#endif - -ignored :: FilePath -> Bool -ignored ".git" = True -ignored ".gitignore" = True -ignored ".gitattributes" = True -ignored _ = False - -{- Stores the Annex state in a MVar, so that threaded actions can access - - it. - - - - Once the action is finished, retrieves the state from the MVar. - -} -withStateMVar :: (MVar Annex.AnnexState -> Annex a) -> Annex a -withStateMVar a = do - state <- Annex.getState id - mvar <- liftIO $ newMVar state - r <- a mvar - newstate <- liftIO $ takeMVar mvar - Annex.changeState (const newstate) - return r - -{- Runs an Annex action, using the state from the MVar. -} -runStateMVar :: MVar Annex.AnnexState -> Annex a -> IO a -runStateMVar mvar a = do - startstate <- takeMVar mvar - !(r, newstate) <- Annex.run startstate a - putMVar mvar newstate - return r - -runChangeChan :: STM a -> IO a -runChangeChan = atomically - -{- Runs an action handler, inside the Annex monad, and if there was a - - change, adds it to the ChangeChan. - - - - Exceptions are ignored, otherwise a whole watcher thread could be crashed. - -} -runHandler :: MVar Annex.AnnexState -> MVar DaemonStatus -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus changechan handler file filestatus = void $ do - r <- tryIO go - case r of - Left e -> print e - Right Nothing -> noop - Right (Just change) -> void $ - runChangeChan $ writeTChan changechan change - where - go = runStateMVar st $ handler file filestatus dstatus - -{- Handlers call this when they made a change that needs to get committed. -} -madeChange :: FilePath -> String -> Annex (Maybe Change) -madeChange file desc = do - -- Just in case the commit thread is not flushing the queue fast enough. - Annex.Queue.flushWhenFull - liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc) - -noChange :: Annex (Maybe Change) -noChange = return Nothing - -{- Adding a file is tricky; the file has to be replaced with a symlink - - but this is race prone, as the symlink could be changed immediately - - after creation. To avoid that race, git add is not used to stage the - - symlink. - - - - Inotify will notice the new symlink, so this Handler does not stage it - - or return a Change, leaving that to onAddSymlink. - - - - During initial directory scan, this will be run for any files that - - are already checked into git. We don't want to turn those into symlinks, - - so do a check. This is rather expensive, but only happens during - - startup. - -} -onAdd :: Handler -onAdd file _filestatus dstatus = do - ifM (scanComplete <$> getDaemonStatus dstatus) - ( go - , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) - ( noChange - , go - ) - ) - where - go = do - showStart "add" file - handle =<< Command.Add.ingest file - noChange - handle Nothing = showEndFail - handle (Just key) = do - Command.Add.link file key True - showEndOk - -{- A symlink might be an arbitrary symlink, which is just added. - - Or, if it is a git-annex symlink, ensure it points to the content - - before adding it. - - - -} -onAddSymlink :: Handler -onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file - where - go (Just (key, _)) = do - link <- calcGitLink file key - ifM ((==) link <$> liftIO (readSymbolicLink file)) - ( ensurestaged link =<< getDaemonStatus dstatus - , do - liftIO $ removeFile file - liftIO $ createSymbolicLink link file - addlink link - ) - go Nothing = do -- other symlink - link <- liftIO (readSymbolicLink file) - ensurestaged link =<< getDaemonStatus dstatus - - {- This is often called on symlinks that are already - - staged correctly. A symlink may have been deleted - - and being re-added, or added when the watcher was - - not running. So they're normally restaged to make sure. - - - - As an optimisation, during the status scan, avoid - - restaging everything. Only links that were created since - - the last time the daemon was running are staged. - - (If the daemon has never ran before, avoid staging - - links too.) - -} - ensurestaged link daemonstatus - | scanComplete daemonstatus = addlink link - | otherwise = case filestatus of - Just s - | safe (statusChangeTime s) -> noChange - _ -> addlink link - where - safe t = maybe True (> t) (lastRunning daemonstatus) - - {- For speed, tries to reuse the existing blob for - - the symlink target. -} - addlink link = do - v <- catObjectDetails $ Ref $ ':':file - case v of - Just (currlink, sha) - | s2w8 link == L.unpack currlink -> - stageSymlink file sha - _ -> do - sha <- inRepo $ - Git.HashObject.hashObject BlobObject link - stageSymlink file sha - madeChange file "link" - -onDel :: Handler -onDel file _ _dstatus = do - Annex.Queue.addUpdateIndex =<< - inRepo (Git.UpdateIndex.unstageFile file) - madeChange file "rm" - -{- A directory has been deleted, or moved, so tell git to remove anything - - that was inside it from its cache. Since it could reappear at any time, - - use --cached to only delete it from the index. - - - - Note: This could use unstageFile, but would need to run another git - - command to get the recursive list of files in the directory, so rm is - - just as good. -} -onDelDir :: Handler -onDelDir dir _ _dstatus = do - Annex.Queue.addCommand "rm" - [Params "--quiet -r --cached --ignore-unmatch --"] [dir] - madeChange dir "rmdir" - -{- Called when there's an error with inotify. -} -onErr :: Handler -onErr msg _ _dstatus = do - warning msg - return Nothing - -{- Adds a symlink to the index, without ever accessing the actual symlink - - on disk. -} -stageSymlink :: FilePath -> Sha -> Annex () -stageSymlink file sha = - Annex.Queue.addUpdateIndex =<< - inRepo (Git.UpdateIndex.stageSymlink file sha) - -{- Gets all unhandled changes. - - Blocks until at least one change is made. -} -getChanges :: ChangeChan -> IO [Change] -getChanges chan = runChangeChan $ do - c <- readTChan chan - go [c] - where - go l = do - v <- tryReadTChan chan - case v of - Nothing -> return l - Just c -> go (c:l) - -{- Puts unhandled changes back into the channel. - - Note: Original order is not preserved. -} -refillChanges :: ChangeChan -> [Change] -> IO () -refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs - -{- This thread makes git commits at appropriate times. -} -commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO () -commitThread st changechan = forever $ do - -- First, a simple rate limiter. - threadDelay oneSecond - -- Next, wait until at least one change has been made. - cs <- getChanges changechan - -- Now see if now's a good time to commit. - time <- getCurrentTime - if shouldCommit time cs - then void $ tryIO $ runStateMVar st commitStaged - else refillChanges changechan cs - where - oneSecond = 1000000 -- microseconds - -commitStaged :: Annex () -commitStaged = do - Annex.Queue.flush - inRepo $ Git.Command.run "commit" - [ Param "--allow-empty-message" - , Param "-m", Param "" - -- Empty commits may be made if tree changes cancel - -- each other out, etc - , Param "--allow-empty" - -- Avoid running the usual git-annex pre-commit hook; - -- watch does the same symlink fixing, and we don't want - -- to deal with unlocked files in these commits. - , Param "--quiet" - ] - -{- Decide if now is a good time to make a commit. - - Note that the list of change times has an undefined order. - - - - Current strategy: If there have been 10 commits within the past second, - - a batch activity is taking place, so wait for later. - -} -shouldCommit :: UTCTime -> [Change] -> Bool -shouldCommit now changes - | len == 0 = False - | len > 10000 = True -- avoid bloating queue too much - | length (filter thisSecond changes) < 10 = True - | otherwise = False -- batch activity - where - len = length changes - thisSecond c = now `diffUTCTime` changeTime c <= 1 |