diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/Committer.hs | 104 | ||||
-rw-r--r-- | Assistant/DaemonStatus.hs | 103 | ||||
-rw-r--r-- | Assistant/ThreadedMonad.hs | 40 | ||||
-rw-r--r-- | Assistant/Watcher.hs | 204 |
4 files changed, 451 insertions, 0 deletions
diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs new file mode 100644 index 000000000..d6fc08579 --- /dev/null +++ b/Assistant/Committer.hs @@ -0,0 +1,104 @@ +{- git-annex assistant change tracking and committing + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Assistant.Committer where + +import Common.Annex +import Assistant.ThreadedMonad +import qualified Annex.Queue +import qualified Git.Command + +import Control.Concurrent +import Control.Concurrent.STM +import Data.Time.Clock + +type ChangeChan = TChan Change + +data Change = Change + { changeTime :: UTCTime + , changeFile :: FilePath + , changeDesc :: String + } + deriving (Show) + +runChangeChan :: STM a -> IO a +runChangeChan = atomically + +newChangeChan :: IO ChangeChan +newChangeChan = atomically newTChan + +{- Handlers call this when they made a change that needs to get committed. -} +madeChange :: FilePath -> String -> Annex (Maybe Change) +madeChange file desc = do + -- Just in case the commit thread is not flushing the queue fast enough. + Annex.Queue.flushWhenFull + liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc) + +noChange :: Annex (Maybe Change) +noChange = return Nothing + +{- Gets all unhandled changes. + - Blocks until at least one change is made. -} +getChanges :: ChangeChan -> IO [Change] +getChanges chan = runChangeChan $ do + c <- readTChan chan + go [c] + where + go l = do + v <- tryReadTChan chan + case v of + Nothing -> return l + Just c -> go (c:l) + +{- Puts unhandled changes back into the channel. + - Note: Original order is not preserved. -} +refillChanges :: ChangeChan -> [Change] -> IO () +refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs + +{- This thread makes git commits at appropriate times. -} +commitThread :: ThreadState -> ChangeChan -> IO () +commitThread st changechan = forever $ do + -- First, a simple rate limiter. + threadDelay oneSecond + -- Next, wait until at least one change has been made. + cs <- getChanges changechan + -- Now see if now's a good time to commit. + time <- getCurrentTime + if shouldCommit time cs + then void $ tryIO $ runThreadState st commitStaged + else refillChanges changechan cs + where + oneSecond = 1000000 -- microseconds + +commitStaged :: Annex () +commitStaged = do + Annex.Queue.flush + inRepo $ Git.Command.run "commit" + [ Param "--allow-empty-message" + , Param "-m", Param "" + -- Empty commits may be made if tree changes cancel + -- each other out, etc + , Param "--allow-empty" + -- Avoid running the usual git-annex pre-commit hook; + -- watch does the same symlink fixing, and we don't want + -- to deal with unlocked files in these commits. + , Param "--quiet" + ] + +{- Decide if now is a good time to make a commit. + - Note that the list of change times has an undefined order. + - + - Current strategy: If there have been 10 commits within the past second, + - a batch activity is taking place, so wait for later. + -} +shouldCommit :: UTCTime -> [Change] -> Bool +shouldCommit now changes + | len == 0 = False + | len > 10000 = True -- avoid bloating queue too much + | length (filter thisSecond changes) < 10 = True + | otherwise = False -- batch activity + where + len = length changes + thisSecond c = now `diffUTCTime` changeTime c <= 1 diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs new file mode 100644 index 000000000..eb8ff256b --- /dev/null +++ b/Assistant/DaemonStatus.hs @@ -0,0 +1,103 @@ +{- git-annex assistant daemon status + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Assistant.DaemonStatus where + +import Common.Annex +import Utility.TempFile +import Assistant.ThreadedMonad + +import Control.Concurrent +import System.Posix.Types +import Data.Time.Clock.POSIX +import Data.Time +import System.Locale + +data DaemonStatus = DaemonStatus + -- False when the daemon is performing its startup scan + { scanComplete :: Bool + -- Time when a previous process of the daemon was running ok + , lastRunning :: Maybe POSIXTime + } + deriving (Show) + +type DaemonStatusHandle = MVar DaemonStatus + +newDaemonStatus :: DaemonStatus +newDaemonStatus = DaemonStatus + { scanComplete = False + , lastRunning = Nothing + } + +getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus +getDaemonStatus = liftIO . readMVar + +modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex () +modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a) + +{- Load any previous daemon status file, and store it in the MVar for this + - process to use as its DaemonStatus. -} +startDaemonStatus :: Annex DaemonStatusHandle +startDaemonStatus = do + file <- fromRepo gitAnnexDaemonStatusFile + status <- liftIO $ + catchDefaultIO (readDaemonStatusFile file) newDaemonStatus + liftIO $ newMVar status { scanComplete = False } + +{- This thread wakes up periodically and writes the daemon status to disk. -} +daemonStatusThread :: ThreadState -> DaemonStatusHandle -> IO () +daemonStatusThread st handle = do + checkpoint + forever $ do + threadDelay tenMinutes + checkpoint + where + checkpoint = runThreadState st $ do + file <- fromRepo gitAnnexDaemonStatusFile + status <- getDaemonStatus handle + liftIO $ writeDaemonStatusFile file status + tenMinutes = 10 * 60 * 1000000 -- microseconds + +{- Don't just dump out the structure, because it will change over time, + - and parts of it are not relevant. -} +writeDaemonStatusFile :: FilePath -> DaemonStatus -> IO () +writeDaemonStatusFile file status = + viaTmp writeFile file =<< serialized <$> getPOSIXTime + where + serialized now = unlines + [ "lastRunning:" ++ show now + , "scanComplete:" ++ show (scanComplete status) + ] + +readDaemonStatusFile :: FilePath -> IO DaemonStatus +readDaemonStatusFile file = parse <$> readFile file + where + parse = foldr parseline newDaemonStatus . lines + parseline line status + | key == "lastRunning" = parseval readtime $ \v -> + status { lastRunning = Just v } + | key == "scanComplete" = parseval readish $ \v -> + status { scanComplete = v } + | otherwise = status -- unparsable line + where + (key, value) = separate (== ':') line + parseval parser a = maybe status a (parser value) + readtime s = do + d <- parseTime defaultTimeLocale "%s%Qs" s + Just $ utcTimeToPOSIXSeconds d + +{- Checks if a time stamp was made after the daemon was lastRunning. + - + - Some slop is built in; this really checks if the time stamp was made + - at least ten minutes after the daemon was lastRunning. This is to + - ensure the daemon shut down cleanly, and deal with minor clock skew. + - + - If the daemon has never ran before, this always returns False. + -} +afterLastDaemonRun :: EpochTime -> DaemonStatus -> Bool +afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status) + where + t = realToFrac (timestamp + slop) :: POSIXTime + slop = 10 * 60 diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs new file mode 100644 index 000000000..c4d331f61 --- /dev/null +++ b/Assistant/ThreadedMonad.hs @@ -0,0 +1,40 @@ +{- making the Annex monad available across threads + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +{-# LANGUAGE BangPatterns #-} + +module Assistant.ThreadedMonad where + +import Common.Annex +import qualified Annex + +import Control.Concurrent + +{- The Annex state is stored in a MVar, so that threaded actions can access + - it. -} +type ThreadState = MVar Annex.AnnexState + +{- Stores the Annex state in a MVar. + - + - Once the action is finished, retrieves the state from the MVar. + -} +withThreadState :: (ThreadState -> Annex a) -> Annex a +withThreadState a = do + state <- Annex.getState id + mvar <- liftIO $ newMVar state + r <- a mvar + newstate <- liftIO $ takeMVar mvar + Annex.changeState (const newstate) + return r + +{- Runs an Annex action, using the state from the MVar. + - + - This serializes calls by threads. -} +runThreadState :: ThreadState -> Annex a -> IO a +runThreadState mvar a = do + startstate <- takeMVar mvar + !(r, newstate) <- Annex.run startstate a + putMVar mvar newstate + return r diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs new file mode 100644 index 000000000..ee5bc13af --- /dev/null +++ b/Assistant/Watcher.hs @@ -0,0 +1,204 @@ +{- git-annex assistant tree watcher + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Watcher where + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.Committer +import Utility.ThreadLock +import qualified Annex.Queue +import qualified Command.Add +import qualified Git.Command +import qualified Git.UpdateIndex +import qualified Git.HashObject +import qualified Git.LsFiles +import qualified Backend +import Annex.Content +import Annex.CatFile +import Git.Types + +import Control.Concurrent.STM +import Data.Bits.Utils +import qualified Data.ByteString.Lazy as L + +#if defined linux_HOST_OS +import Utility.Inotify +import System.INotify +#endif + +type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change) + +watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () +#if defined linux_HOST_OS +watchThread st dstatus changechan = withINotify $ \i -> do + runThreadState st $ + showAction "scanning" + -- This does not return until the startup scan is done. + -- That can take some time for large trees. + watchDir i "." (ignored . takeFileName) hooks + runThreadState st $ + modifyDaemonStatus dstatus $ \s -> s { scanComplete = True } + -- Notice any files that were deleted before inotify + -- was started. + runThreadState st $ do + inRepo $ Git.Command.run "add" [Param "--update"] + showAction "started" + waitForTermination + where + hook a = Just $ runHandler st dstatus changechan a + hooks = WatchHooks + { addHook = hook onAdd + , delHook = hook onDel + , addSymlinkHook = hook onAddSymlink + , delDirHook = hook onDelDir + , errHook = hook onErr + } +#else +watchThread = error "so far only available on Linux" +#endif + +ignored :: FilePath -> Bool +ignored ".git" = True +ignored ".gitignore" = True +ignored ".gitattributes" = True +ignored _ = False + +{- Runs an action handler, inside the Annex monad, and if there was a + - change, adds it to the ChangeChan. + - + - Exceptions are ignored, otherwise a whole watcher thread could be crashed. + -} +runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler st dstatus changechan handler file filestatus = void $ do + r <- tryIO go + case r of + Left e -> print e + Right Nothing -> noop + Right (Just change) -> void $ + runChangeChan $ writeTChan changechan change + where + go = runThreadState st $ handler file filestatus dstatus + +{- Adding a file is tricky; the file has to be replaced with a symlink + - but this is race prone, as the symlink could be changed immediately + - after creation. To avoid that race, git add is not used to stage the + - symlink. + - + - Inotify will notice the new symlink, so this Handler does not stage it + - or return a Change, leaving that to onAddSymlink. + - + - During initial directory scan, this will be run for any files that + - are already checked into git. We don't want to turn those into symlinks, + - so do a check. This is rather expensive, but only happens during + - startup. + -} +onAdd :: Handler +onAdd file _filestatus dstatus = do + ifM (scanComplete <$> getDaemonStatus dstatus) + ( go + , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) + ( noChange + , go + ) + ) + where + go = do + showStart "add" file + handle =<< Command.Add.ingest file + noChange + handle Nothing = showEndFail + handle (Just key) = do + Command.Add.link file key True + showEndOk + +{- A symlink might be an arbitrary symlink, which is just added. + - Or, if it is a git-annex symlink, ensure it points to the content + - before adding it. + -} +onAddSymlink :: Handler +onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file + where + go (Just (key, _)) = do + link <- calcGitLink file key + ifM ((==) link <$> liftIO (readSymbolicLink file)) + ( ensurestaged link =<< getDaemonStatus dstatus + , do + liftIO $ removeFile file + liftIO $ createSymbolicLink link file + addlink link + ) + go Nothing = do -- other symlink + link <- liftIO (readSymbolicLink file) + ensurestaged link =<< getDaemonStatus dstatus + + {- This is often called on symlinks that are already + - staged correctly. A symlink may have been deleted + - and being re-added, or added when the watcher was + - not running. So they're normally restaged to make sure. + - + - As an optimisation, during the status scan, avoid + - restaging everything. Only links that were created since + - the last time the daemon was running are staged. + - (If the daemon has never ran before, avoid staging + - links too.) + -} + ensurestaged link daemonstatus + | scanComplete daemonstatus = addlink link + | otherwise = case filestatus of + Just s + | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange + _ -> addlink link + + {- For speed, tries to reuse the existing blob for + - the symlink target. -} + addlink link = do + v <- catObjectDetails $ Ref $ ':':file + case v of + Just (currlink, sha) + | s2w8 link == L.unpack currlink -> + stageSymlink file sha + _ -> do + sha <- inRepo $ + Git.HashObject.hashObject BlobObject link + stageSymlink file sha + madeChange file "link" + +onDel :: Handler +onDel file _ _dstatus = do + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.unstageFile file) + madeChange file "rm" + +{- A directory has been deleted, or moved, so tell git to remove anything + - that was inside it from its cache. Since it could reappear at any time, + - use --cached to only delete it from the index. + - + - Note: This could use unstageFile, but would need to run another git + - command to get the recursive list of files in the directory, so rm is + - just as good. -} +onDelDir :: Handler +onDelDir dir _ _dstatus = do + Annex.Queue.addCommand "rm" + [Params "--quiet -r --cached --ignore-unmatch --"] [dir] + madeChange dir "rmdir" + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr msg _ _dstatus = do + warning msg + return Nothing + +{- Adds a symlink to the index, without ever accessing the actual symlink + - on disk. -} +stageSymlink :: FilePath -> Sha -> Annex () +stageSymlink file sha = + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.stageSymlink file sha) |