summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-06-13 12:36:33 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-06-13 12:46:39 -0400
commitccc50052453ccaf2db0c371c5c36b5eea3e9191a (patch)
treedfb2ce0b7bfed5c8490cda68b6cd733bef72c473 /Assistant
parentc31ddeda84542414dd58e03473a23a6de8890390 (diff)
reorganize
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Committer.hs104
-rw-r--r--Assistant/DaemonStatus.hs35
-rw-r--r--Assistant/ThreadedMonad.hs40
-rw-r--r--Assistant/Watcher.hs206
4 files changed, 385 insertions, 0 deletions
diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs
new file mode 100644
index 000000000..d6fc08579
--- /dev/null
+++ b/Assistant/Committer.hs
@@ -0,0 +1,104 @@
+{- git-annex assistant change tracking and committing
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Assistant.Committer where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import qualified Annex.Queue
+import qualified Git.Command
+
+import Control.Concurrent
+import Control.Concurrent.STM
+import Data.Time.Clock
+
+type ChangeChan = TChan Change
+
+data Change = Change
+ { changeTime :: UTCTime
+ , changeFile :: FilePath
+ , changeDesc :: String
+ }
+ deriving (Show)
+
+runChangeChan :: STM a -> IO a
+runChangeChan = atomically
+
+newChangeChan :: IO ChangeChan
+newChangeChan = atomically newTChan
+
+{- Handlers call this when they made a change that needs to get committed. -}
+madeChange :: FilePath -> String -> Annex (Maybe Change)
+madeChange file desc = do
+ -- Just in case the commit thread is not flushing the queue fast enough.
+ Annex.Queue.flushWhenFull
+ liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
+
+noChange :: Annex (Maybe Change)
+noChange = return Nothing
+
+{- Gets all unhandled changes.
+ - Blocks until at least one change is made. -}
+getChanges :: ChangeChan -> IO [Change]
+getChanges chan = runChangeChan $ do
+ c <- readTChan chan
+ go [c]
+ where
+ go l = do
+ v <- tryReadTChan chan
+ case v of
+ Nothing -> return l
+ Just c -> go (c:l)
+
+{- Puts unhandled changes back into the channel.
+ - Note: Original order is not preserved. -}
+refillChanges :: ChangeChan -> [Change] -> IO ()
+refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
+
+{- This thread makes git commits at appropriate times. -}
+commitThread :: ThreadState -> ChangeChan -> IO ()
+commitThread st changechan = forever $ do
+ -- First, a simple rate limiter.
+ threadDelay oneSecond
+ -- Next, wait until at least one change has been made.
+ cs <- getChanges changechan
+ -- Now see if now's a good time to commit.
+ time <- getCurrentTime
+ if shouldCommit time cs
+ then void $ tryIO $ runThreadState st commitStaged
+ else refillChanges changechan cs
+ where
+ oneSecond = 1000000 -- microseconds
+
+commitStaged :: Annex ()
+commitStaged = do
+ Annex.Queue.flush
+ inRepo $ Git.Command.run "commit"
+ [ Param "--allow-empty-message"
+ , Param "-m", Param ""
+ -- Empty commits may be made if tree changes cancel
+ -- each other out, etc
+ , Param "--allow-empty"
+ -- Avoid running the usual git-annex pre-commit hook;
+ -- watch does the same symlink fixing, and we don't want
+ -- to deal with unlocked files in these commits.
+ , Param "--quiet"
+ ]
+
+{- Decide if now is a good time to make a commit.
+ - Note that the list of change times has an undefined order.
+ -
+ - Current strategy: If there have been 10 commits within the past second,
+ - a batch activity is taking place, so wait for later.
+ -}
+shouldCommit :: UTCTime -> [Change] -> Bool
+shouldCommit now changes
+ | len == 0 = False
+ | len > 10000 = True -- avoid bloating queue too much
+ | length (filter thisSecond changes) < 10 = True
+ | otherwise = False -- batch activity
+ where
+ len = length changes
+ thisSecond c = now `diffUTCTime` changeTime c <= 1
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
new file mode 100644
index 000000000..43a8c7e2f
--- /dev/null
+++ b/Assistant/DaemonStatus.hs
@@ -0,0 +1,35 @@
+{- git-annex assistant daemon status
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Assistant.DaemonStatus where
+
+import Common.Annex
+
+import Control.Concurrent
+import System.Posix.Types
+
+data DaemonStatus = DaemonStatus
+ -- False when the daemon is performing its startup scan
+ { scanComplete :: Bool
+ -- Time when a previous process of the daemon was running ok
+ , lastRunning :: Maybe EpochTime
+ }
+
+type DaemonStatusHandle = MVar DaemonStatus
+
+newDaemonStatus :: DaemonStatus
+newDaemonStatus = DaemonStatus
+ { scanComplete = False
+ , lastRunning = Nothing
+ }
+
+startDaemonStatus :: IO DaemonStatusHandle
+startDaemonStatus = newMVar newDaemonStatus
+
+getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
+getDaemonStatus = liftIO . readMVar
+
+modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
+modifyDaemonStatus status a = liftIO $ modifyMVar_ status (return . a)
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs
new file mode 100644
index 000000000..c4d331f61
--- /dev/null
+++ b/Assistant/ThreadedMonad.hs
@@ -0,0 +1,40 @@
+{- making the Annex monad available across threads
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+{-# LANGUAGE BangPatterns #-}
+
+module Assistant.ThreadedMonad where
+
+import Common.Annex
+import qualified Annex
+
+import Control.Concurrent
+
+{- The Annex state is stored in a MVar, so that threaded actions can access
+ - it. -}
+type ThreadState = MVar Annex.AnnexState
+
+{- Stores the Annex state in a MVar.
+ -
+ - Once the action is finished, retrieves the state from the MVar.
+ -}
+withThreadState :: (ThreadState -> Annex a) -> Annex a
+withThreadState a = do
+ state <- Annex.getState id
+ mvar <- liftIO $ newMVar state
+ r <- a mvar
+ newstate <- liftIO $ takeMVar mvar
+ Annex.changeState (const newstate)
+ return r
+
+{- Runs an Annex action, using the state from the MVar.
+ -
+ - This serializes calls by threads. -}
+runThreadState :: ThreadState -> Annex a -> IO a
+runThreadState mvar a = do
+ startstate <- takeMVar mvar
+ !(r, newstate) <- Annex.run startstate a
+ putMVar mvar newstate
+ return r
diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs
new file mode 100644
index 000000000..19a65db6e
--- /dev/null
+++ b/Assistant/Watcher.hs
@@ -0,0 +1,206 @@
+{- git-annex assistant tree watcher
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE CPP #-}
+
+module Assistant.Watcher where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Committer
+import Utility.ThreadLock
+import qualified Annex.Queue
+import qualified Command.Add
+import qualified Git.Command
+import qualified Git.UpdateIndex
+import qualified Git.HashObject
+import qualified Git.LsFiles
+import qualified Backend
+import Annex.Content
+import Annex.CatFile
+import Git.Types
+
+import Control.Concurrent.STM
+import Data.Bits.Utils
+import qualified Data.ByteString.Lazy as L
+
+#if defined linux_HOST_OS
+import Utility.Inotify
+import System.INotify
+#endif
+
+type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
+
+watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
+#if defined linux_HOST_OS
+watchThread st dstatus changechan = withINotify $ \i -> do
+ runThreadState st $
+ showAction "scanning"
+ -- This does not return until the startup scan is done.
+ -- That can take some time for large trees.
+ watchDir i "." (ignored . takeFileName) hooks
+ runThreadState st $
+ modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
+ -- Notice any files that were deleted before inotify
+ -- was started.
+ runThreadState st $ do
+ inRepo $ Git.Command.run "add" [Param "--update"]
+ showAction "started"
+ waitForTermination
+ where
+ hook a = Just $ runHandler st dstatus changechan a
+ hooks = WatchHooks
+ { addHook = hook onAdd
+ , delHook = hook onDel
+ , addSymlinkHook = hook onAddSymlink
+ , delDirHook = hook onDelDir
+ , errHook = hook onErr
+ }
+#else
+watchThread = error "so far only available on Linux"
+#endif
+
+ignored :: FilePath -> Bool
+ignored ".git" = True
+ignored ".gitignore" = True
+ignored ".gitattributes" = True
+ignored _ = False
+
+{- Runs an action handler, inside the Annex monad, and if there was a
+ - change, adds it to the ChangeChan.
+ -
+ - Exceptions are ignored, otherwise a whole watcher thread could be crashed.
+ -}
+runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus changechan handler file filestatus = void $ do
+ r <- tryIO go
+ case r of
+ Left e -> print e
+ Right Nothing -> noop
+ Right (Just change) -> void $
+ runChangeChan $ writeTChan changechan change
+ where
+ go = runThreadState st $ handler file filestatus dstatus
+
+{- Adding a file is tricky; the file has to be replaced with a symlink
+ - but this is race prone, as the symlink could be changed immediately
+ - after creation. To avoid that race, git add is not used to stage the
+ - symlink.
+ -
+ - Inotify will notice the new symlink, so this Handler does not stage it
+ - or return a Change, leaving that to onAddSymlink.
+ -
+ - During initial directory scan, this will be run for any files that
+ - are already checked into git. We don't want to turn those into symlinks,
+ - so do a check. This is rather expensive, but only happens during
+ - startup.
+ -}
+onAdd :: Handler
+onAdd file _filestatus dstatus = do
+ ifM (scanComplete <$> getDaemonStatus dstatus)
+ ( go
+ , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
+ ( noChange
+ , go
+ )
+ )
+ where
+ go = do
+ showStart "add" file
+ handle =<< Command.Add.ingest file
+ noChange
+ handle Nothing = showEndFail
+ handle (Just key) = do
+ Command.Add.link file key True
+ showEndOk
+
+{- A symlink might be an arbitrary symlink, which is just added.
+ - Or, if it is a git-annex symlink, ensure it points to the content
+ - before adding it.
+ -}
+onAddSymlink :: Handler
+onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
+ where
+ go (Just (key, _)) = do
+ link <- calcGitLink file key
+ ifM ((==) link <$> liftIO (readSymbolicLink file))
+ ( ensurestaged link =<< getDaemonStatus dstatus
+ , do
+ liftIO $ removeFile file
+ liftIO $ createSymbolicLink link file
+ addlink link
+ )
+ go Nothing = do -- other symlink
+ link <- liftIO (readSymbolicLink file)
+ ensurestaged link =<< getDaemonStatus dstatus
+
+ {- This is often called on symlinks that are already
+ - staged correctly. A symlink may have been deleted
+ - and being re-added, or added when the watcher was
+ - not running. So they're normally restaged to make sure.
+ -
+ - As an optimisation, during the status scan, avoid
+ - restaging everything. Only links that were created since
+ - the last time the daemon was running are staged.
+ - (If the daemon has never ran before, avoid staging
+ - links too.)
+ -}
+ ensurestaged link daemonstatus
+ | scanComplete daemonstatus = addlink link
+ | otherwise = case filestatus of
+ Just s
+ | safe (statusChangeTime s) -> noChange
+ _ -> addlink link
+ where
+ safe t = maybe True (> t) (lastRunning daemonstatus)
+
+ {- For speed, tries to reuse the existing blob for
+ - the symlink target. -}
+ addlink link = do
+ v <- catObjectDetails $ Ref $ ':':file
+ case v of
+ Just (currlink, sha)
+ | s2w8 link == L.unpack currlink ->
+ stageSymlink file sha
+ _ -> do
+ sha <- inRepo $
+ Git.HashObject.hashObject BlobObject link
+ stageSymlink file sha
+ madeChange file "link"
+
+onDel :: Handler
+onDel file _ _dstatus = do
+ Annex.Queue.addUpdateIndex =<<
+ inRepo (Git.UpdateIndex.unstageFile file)
+ madeChange file "rm"
+
+{- A directory has been deleted, or moved, so tell git to remove anything
+ - that was inside it from its cache. Since it could reappear at any time,
+ - use --cached to only delete it from the index.
+ -
+ - Note: This could use unstageFile, but would need to run another git
+ - command to get the recursive list of files in the directory, so rm is
+ - just as good. -}
+onDelDir :: Handler
+onDelDir dir _ _dstatus = do
+ Annex.Queue.addCommand "rm"
+ [Params "--quiet -r --cached --ignore-unmatch --"] [dir]
+ madeChange dir "rmdir"
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr msg _ _dstatus = do
+ warning msg
+ return Nothing
+
+{- Adds a symlink to the index, without ever accessing the actual symlink
+ - on disk. -}
+stageSymlink :: FilePath -> Sha -> Annex ()
+stageSymlink file sha =
+ Annex.Queue.addUpdateIndex =<<
+ inRepo (Git.UpdateIndex.stageSymlink file sha)