summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Committer.hs104
-rw-r--r--Assistant/DaemonStatus.hs103
-rw-r--r--Assistant/ThreadedMonad.hs40
-rw-r--r--Assistant/Watcher.hs204
4 files changed, 451 insertions, 0 deletions
diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs
new file mode 100644
index 000000000..d6fc08579
--- /dev/null
+++ b/Assistant/Committer.hs
@@ -0,0 +1,104 @@
+{- git-annex assistant change tracking and committing
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Assistant.Committer where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import qualified Annex.Queue
+import qualified Git.Command
+
+import Control.Concurrent
+import Control.Concurrent.STM
+import Data.Time.Clock
+
+type ChangeChan = TChan Change
+
+data Change = Change
+ { changeTime :: UTCTime
+ , changeFile :: FilePath
+ , changeDesc :: String
+ }
+ deriving (Show)
+
+runChangeChan :: STM a -> IO a
+runChangeChan = atomically
+
+newChangeChan :: IO ChangeChan
+newChangeChan = atomically newTChan
+
+{- Handlers call this when they made a change that needs to get committed. -}
+madeChange :: FilePath -> String -> Annex (Maybe Change)
+madeChange file desc = do
+ -- Just in case the commit thread is not flushing the queue fast enough.
+ Annex.Queue.flushWhenFull
+ liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
+
+noChange :: Annex (Maybe Change)
+noChange = return Nothing
+
+{- Gets all unhandled changes.
+ - Blocks until at least one change is made. -}
+getChanges :: ChangeChan -> IO [Change]
+getChanges chan = runChangeChan $ do
+ c <- readTChan chan
+ go [c]
+ where
+ go l = do
+ v <- tryReadTChan chan
+ case v of
+ Nothing -> return l
+ Just c -> go (c:l)
+
+{- Puts unhandled changes back into the channel.
+ - Note: Original order is not preserved. -}
+refillChanges :: ChangeChan -> [Change] -> IO ()
+refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
+
+{- This thread makes git commits at appropriate times. -}
+commitThread :: ThreadState -> ChangeChan -> IO ()
+commitThread st changechan = forever $ do
+ -- First, a simple rate limiter.
+ threadDelay oneSecond
+ -- Next, wait until at least one change has been made.
+ cs <- getChanges changechan
+ -- Now see if now's a good time to commit.
+ time <- getCurrentTime
+ if shouldCommit time cs
+ then void $ tryIO $ runThreadState st commitStaged
+ else refillChanges changechan cs
+ where
+ oneSecond = 1000000 -- microseconds
+
+commitStaged :: Annex ()
+commitStaged = do
+ Annex.Queue.flush
+ inRepo $ Git.Command.run "commit"
+ [ Param "--allow-empty-message"
+ , Param "-m", Param ""
+ -- Empty commits may be made if tree changes cancel
+ -- each other out, etc
+ , Param "--allow-empty"
+ -- Avoid running the usual git-annex pre-commit hook;
+ -- watch does the same symlink fixing, and we don't want
+ -- to deal with unlocked files in these commits.
+ , Param "--quiet"
+ ]
+
+{- Decide if now is a good time to make a commit.
+ - Note that the list of change times has an undefined order.
+ -
+ - Current strategy: If there have been 10 commits within the past second,
+ - a batch activity is taking place, so wait for later.
+ -}
+shouldCommit :: UTCTime -> [Change] -> Bool
+shouldCommit now changes
+ | len == 0 = False
+ | len > 10000 = True -- avoid bloating queue too much
+ | length (filter thisSecond changes) < 10 = True
+ | otherwise = False -- batch activity
+ where
+ len = length changes
+ thisSecond c = now `diffUTCTime` changeTime c <= 1
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
new file mode 100644
index 000000000..eb8ff256b
--- /dev/null
+++ b/Assistant/DaemonStatus.hs
@@ -0,0 +1,103 @@
+{- git-annex assistant daemon status
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Assistant.DaemonStatus where
+
+import Common.Annex
+import Utility.TempFile
+import Assistant.ThreadedMonad
+
+import Control.Concurrent
+import System.Posix.Types
+import Data.Time.Clock.POSIX
+import Data.Time
+import System.Locale
+
+data DaemonStatus = DaemonStatus
+ -- False when the daemon is performing its startup scan
+ { scanComplete :: Bool
+ -- Time when a previous process of the daemon was running ok
+ , lastRunning :: Maybe POSIXTime
+ }
+ deriving (Show)
+
+type DaemonStatusHandle = MVar DaemonStatus
+
+newDaemonStatus :: DaemonStatus
+newDaemonStatus = DaemonStatus
+ { scanComplete = False
+ , lastRunning = Nothing
+ }
+
+getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
+getDaemonStatus = liftIO . readMVar
+
+modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
+modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a)
+
+{- Load any previous daemon status file, and store it in the MVar for this
+ - process to use as its DaemonStatus. -}
+startDaemonStatus :: Annex DaemonStatusHandle
+startDaemonStatus = do
+ file <- fromRepo gitAnnexDaemonStatusFile
+ status <- liftIO $
+ catchDefaultIO (readDaemonStatusFile file) newDaemonStatus
+ liftIO $ newMVar status { scanComplete = False }
+
+{- This thread wakes up periodically and writes the daemon status to disk. -}
+daemonStatusThread :: ThreadState -> DaemonStatusHandle -> IO ()
+daemonStatusThread st handle = do
+ checkpoint
+ forever $ do
+ threadDelay tenMinutes
+ checkpoint
+ where
+ checkpoint = runThreadState st $ do
+ file <- fromRepo gitAnnexDaemonStatusFile
+ status <- getDaemonStatus handle
+ liftIO $ writeDaemonStatusFile file status
+ tenMinutes = 10 * 60 * 1000000 -- microseconds
+
+{- Don't just dump out the structure, because it will change over time,
+ - and parts of it are not relevant. -}
+writeDaemonStatusFile :: FilePath -> DaemonStatus -> IO ()
+writeDaemonStatusFile file status =
+ viaTmp writeFile file =<< serialized <$> getPOSIXTime
+ where
+ serialized now = unlines
+ [ "lastRunning:" ++ show now
+ , "scanComplete:" ++ show (scanComplete status)
+ ]
+
+readDaemonStatusFile :: FilePath -> IO DaemonStatus
+readDaemonStatusFile file = parse <$> readFile file
+ where
+ parse = foldr parseline newDaemonStatus . lines
+ parseline line status
+ | key == "lastRunning" = parseval readtime $ \v ->
+ status { lastRunning = Just v }
+ | key == "scanComplete" = parseval readish $ \v ->
+ status { scanComplete = v }
+ | otherwise = status -- unparsable line
+ where
+ (key, value) = separate (== ':') line
+ parseval parser a = maybe status a (parser value)
+ readtime s = do
+ d <- parseTime defaultTimeLocale "%s%Qs" s
+ Just $ utcTimeToPOSIXSeconds d
+
+{- Checks if a time stamp was made after the daemon was lastRunning.
+ -
+ - Some slop is built in; this really checks if the time stamp was made
+ - at least ten minutes after the daemon was lastRunning. This is to
+ - ensure the daemon shut down cleanly, and deal with minor clock skew.
+ -
+ - If the daemon has never ran before, this always returns False.
+ -}
+afterLastDaemonRun :: EpochTime -> DaemonStatus -> Bool
+afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
+ where
+ t = realToFrac (timestamp + slop) :: POSIXTime
+ slop = 10 * 60
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs
new file mode 100644
index 000000000..c4d331f61
--- /dev/null
+++ b/Assistant/ThreadedMonad.hs
@@ -0,0 +1,40 @@
+{- making the Annex monad available across threads
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+{-# LANGUAGE BangPatterns #-}
+
+module Assistant.ThreadedMonad where
+
+import Common.Annex
+import qualified Annex
+
+import Control.Concurrent
+
+{- The Annex state is stored in a MVar, so that threaded actions can access
+ - it. -}
+type ThreadState = MVar Annex.AnnexState
+
+{- Stores the Annex state in a MVar.
+ -
+ - Once the action is finished, retrieves the state from the MVar.
+ -}
+withThreadState :: (ThreadState -> Annex a) -> Annex a
+withThreadState a = do
+ state <- Annex.getState id
+ mvar <- liftIO $ newMVar state
+ r <- a mvar
+ newstate <- liftIO $ takeMVar mvar
+ Annex.changeState (const newstate)
+ return r
+
+{- Runs an Annex action, using the state from the MVar.
+ -
+ - This serializes calls by threads. -}
+runThreadState :: ThreadState -> Annex a -> IO a
+runThreadState mvar a = do
+ startstate <- takeMVar mvar
+ !(r, newstate) <- Annex.run startstate a
+ putMVar mvar newstate
+ return r
diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs
new file mode 100644
index 000000000..ee5bc13af
--- /dev/null
+++ b/Assistant/Watcher.hs
@@ -0,0 +1,204 @@
+{- git-annex assistant tree watcher
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE CPP #-}
+
+module Assistant.Watcher where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Committer
+import Utility.ThreadLock
+import qualified Annex.Queue
+import qualified Command.Add
+import qualified Git.Command
+import qualified Git.UpdateIndex
+import qualified Git.HashObject
+import qualified Git.LsFiles
+import qualified Backend
+import Annex.Content
+import Annex.CatFile
+import Git.Types
+
+import Control.Concurrent.STM
+import Data.Bits.Utils
+import qualified Data.ByteString.Lazy as L
+
+#if defined linux_HOST_OS
+import Utility.Inotify
+import System.INotify
+#endif
+
+type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
+
+watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
+#if defined linux_HOST_OS
+watchThread st dstatus changechan = withINotify $ \i -> do
+ runThreadState st $
+ showAction "scanning"
+ -- This does not return until the startup scan is done.
+ -- That can take some time for large trees.
+ watchDir i "." (ignored . takeFileName) hooks
+ runThreadState st $
+ modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
+ -- Notice any files that were deleted before inotify
+ -- was started.
+ runThreadState st $ do
+ inRepo $ Git.Command.run "add" [Param "--update"]
+ showAction "started"
+ waitForTermination
+ where
+ hook a = Just $ runHandler st dstatus changechan a
+ hooks = WatchHooks
+ { addHook = hook onAdd
+ , delHook = hook onDel
+ , addSymlinkHook = hook onAddSymlink
+ , delDirHook = hook onDelDir
+ , errHook = hook onErr
+ }
+#else
+watchThread = error "so far only available on Linux"
+#endif
+
+ignored :: FilePath -> Bool
+ignored ".git" = True
+ignored ".gitignore" = True
+ignored ".gitattributes" = True
+ignored _ = False
+
+{- Runs an action handler, inside the Annex monad, and if there was a
+ - change, adds it to the ChangeChan.
+ -
+ - Exceptions are ignored, otherwise a whole watcher thread could be crashed.
+ -}
+runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus changechan handler file filestatus = void $ do
+ r <- tryIO go
+ case r of
+ Left e -> print e
+ Right Nothing -> noop
+ Right (Just change) -> void $
+ runChangeChan $ writeTChan changechan change
+ where
+ go = runThreadState st $ handler file filestatus dstatus
+
+{- Adding a file is tricky; the file has to be replaced with a symlink
+ - but this is race prone, as the symlink could be changed immediately
+ - after creation. To avoid that race, git add is not used to stage the
+ - symlink.
+ -
+ - Inotify will notice the new symlink, so this Handler does not stage it
+ - or return a Change, leaving that to onAddSymlink.
+ -
+ - During initial directory scan, this will be run for any files that
+ - are already checked into git. We don't want to turn those into symlinks,
+ - so do a check. This is rather expensive, but only happens during
+ - startup.
+ -}
+onAdd :: Handler
+onAdd file _filestatus dstatus = do
+ ifM (scanComplete <$> getDaemonStatus dstatus)
+ ( go
+ , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
+ ( noChange
+ , go
+ )
+ )
+ where
+ go = do
+ showStart "add" file
+ handle =<< Command.Add.ingest file
+ noChange
+ handle Nothing = showEndFail
+ handle (Just key) = do
+ Command.Add.link file key True
+ showEndOk
+
+{- A symlink might be an arbitrary symlink, which is just added.
+ - Or, if it is a git-annex symlink, ensure it points to the content
+ - before adding it.
+ -}
+onAddSymlink :: Handler
+onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
+ where
+ go (Just (key, _)) = do
+ link <- calcGitLink file key
+ ifM ((==) link <$> liftIO (readSymbolicLink file))
+ ( ensurestaged link =<< getDaemonStatus dstatus
+ , do
+ liftIO $ removeFile file
+ liftIO $ createSymbolicLink link file
+ addlink link
+ )
+ go Nothing = do -- other symlink
+ link <- liftIO (readSymbolicLink file)
+ ensurestaged link =<< getDaemonStatus dstatus
+
+ {- This is often called on symlinks that are already
+ - staged correctly. A symlink may have been deleted
+ - and being re-added, or added when the watcher was
+ - not running. So they're normally restaged to make sure.
+ -
+ - As an optimisation, during the status scan, avoid
+ - restaging everything. Only links that were created since
+ - the last time the daemon was running are staged.
+ - (If the daemon has never ran before, avoid staging
+ - links too.)
+ -}
+ ensurestaged link daemonstatus
+ | scanComplete daemonstatus = addlink link
+ | otherwise = case filestatus of
+ Just s
+ | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange
+ _ -> addlink link
+
+ {- For speed, tries to reuse the existing blob for
+ - the symlink target. -}
+ addlink link = do
+ v <- catObjectDetails $ Ref $ ':':file
+ case v of
+ Just (currlink, sha)
+ | s2w8 link == L.unpack currlink ->
+ stageSymlink file sha
+ _ -> do
+ sha <- inRepo $
+ Git.HashObject.hashObject BlobObject link
+ stageSymlink file sha
+ madeChange file "link"
+
+onDel :: Handler
+onDel file _ _dstatus = do
+ Annex.Queue.addUpdateIndex =<<
+ inRepo (Git.UpdateIndex.unstageFile file)
+ madeChange file "rm"
+
+{- A directory has been deleted, or moved, so tell git to remove anything
+ - that was inside it from its cache. Since it could reappear at any time,
+ - use --cached to only delete it from the index.
+ -
+ - Note: This could use unstageFile, but would need to run another git
+ - command to get the recursive list of files in the directory, so rm is
+ - just as good. -}
+onDelDir :: Handler
+onDelDir dir _ _dstatus = do
+ Annex.Queue.addCommand "rm"
+ [Params "--quiet -r --cached --ignore-unmatch --"] [dir]
+ madeChange dir "rmdir"
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr msg _ _dstatus = do
+ warning msg
+ return Nothing
+
+{- Adds a symlink to the index, without ever accessing the actual symlink
+ - on disk. -}
+stageSymlink :: FilePath -> Sha -> Annex ()
+stageSymlink file sha =
+ Annex.Queue.addUpdateIndex =<<
+ inRepo (Git.UpdateIndex.stageSymlink file sha)