diff options
-rw-r--r-- | Assistant.hs | 4 | ||||
-rw-r--r-- | Assistant/Common.hs | 21 | ||||
-rw-r--r-- | Assistant/Threads/Committer.hs | 23 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 17 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 15 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 28 | ||||
-rw-r--r-- | Assistant/Threads/SanityChecker.hs | 15 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 25 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 13 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 48 | ||||
-rw-r--r-- | Option.hs | 12 |
11 files changed, 176 insertions, 45 deletions
diff --git a/Assistant.hs b/Assistant.hs index dc36957da..4bb1ed4ce 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -82,7 +82,7 @@ module Assistant where -import Common.Annex +import Assistant.Common import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes @@ -136,6 +136,8 @@ startDaemon assistant foreground , mountWatcherThread st dstatus , watchThread st dstatus transferqueue changechan ] + debug "assistant" + ["all git-annex assistant threads started"] waitForTermination stopDaemon :: Annex () diff --git a/Assistant/Common.hs b/Assistant/Common.hs new file mode 100644 index 000000000..c1a346e75 --- /dev/null +++ b/Assistant/Common.hs @@ -0,0 +1,21 @@ +{- Common infrastructure for the git-annex assistant threads. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Common ( + module X, + ThreadName, + debug +) where + +import Common.Annex as X + +import System.Log.Logger + +type ThreadName = String + +debug :: ThreadName -> [String] -> IO () +debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index ff5cc9eab..ffb249404 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -7,7 +7,7 @@ module Assistant.Threads.Committer where -import Common.Annex +import Assistant.Common import Assistant.Changes import Assistant.Commits import Assistant.ThreadedMonad @@ -31,6 +31,9 @@ import Data.Tuple.Utils import qualified Data.Set as S import Data.Either +thisThread :: ThreadName +thisThread = "Committer" + {- This thread makes git commits at appropriate times. -} commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO () commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do @@ -45,10 +48,24 @@ commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds readychanges <- handleAdds st changechan transferqueue dstatus changes if shouldCommit time readychanges then do + debug thisThread + [ "committing" + , show (length readychanges) + , "changes" + ] void $ tryIO $ runThreadState st commitStaged recordCommit commitchan (Commit time) - else refillChanges changechan readychanges - else refillChanges changechan changes + else refill readychanges + else refill changes + where + refill cs = do + debug thisThread + [ "delaying commit of" + , show (length cs) + , "changes" + ] + refillChanges changechan cs + commitStaged :: Annex () commitStaged = do diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index c7da86a8d..10ea34692 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -5,9 +5,12 @@ - Licensed under the GNU GPL version 3 or higher. -} -module Assistant.Threads.Merger where +module Assistant.Threads.Merger ( + mergeThread, + manualPull, +) where -import Common.Annex +import Assistant.Common import Assistant.ThreadedMonad import Utility.DirWatcher import Utility.Types.DirWatcher @@ -19,6 +22,9 @@ import qualified Git.Branch import qualified Command.Sync import qualified Remote +thisThread :: ThreadName +thisThread = "Merger" + {- This thread watches for changes to .git/refs/heads/synced/*, - which indicate incoming pushes. It merges those pushes into the - currently checked out branch. -} @@ -33,6 +39,7 @@ mergeThread st = do , errHook = hook onErr } void $ watchDir dir (const False) hooks id + debug thisThread ["watching", dir] type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO () @@ -68,7 +75,11 @@ onAdd g file _ let changedbranch = Git.Ref $ "refs" </> "heads" </> takeFileName file current <- Git.Branch.current g - when (Just changedbranch == current) $ + when (Just changedbranch == current) $ do + liftIO $ debug thisThread + [ "merging changes into" + , show current + ] void $ mergeBranch changedbranch g mergeBranch :: Git.Ref -> Git.Repo -> IO Bool diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index 863653351..52614c32a 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -10,7 +10,7 @@ module Assistant.Threads.MountWatcher where -import Common.Annex +import Assistant.Common import Assistant.ThreadedMonad import Assistant.DaemonStatus import Utility.ThreadScheduler @@ -19,7 +19,6 @@ import Utility.Mounts import Control.Concurrent import qualified Control.Exception as E import qualified Data.Set as S -import System.Log.Logger #if WITH_DBUS import DBus.Client @@ -29,6 +28,9 @@ import Data.Word (Word32) #warning Building without dbus support; will use mtab polling #endif +thisThread :: ThreadName +thisThread = "MountWatcher" + mountWatcherThread :: ThreadState -> DaemonStatusHandle -> IO () mountWatcherThread st handle = #if WITH_DBUS @@ -89,7 +91,7 @@ checkMountMonitor client = do if null running then startOneService client startableservices else do - myDebug [ "Using running DBUS service" + debug thisThread [ "Using running DBUS service" , Prelude.head running , "to monitor mount events." ] @@ -107,7 +109,7 @@ startOneService client (x:xs) = do [toVariant x, toVariant (0 :: Word32)] ifM (elem x <$> listServiceNames client) ( do - myDebug [ "Started DBUS service" + debug thisThread [ "Started DBUS service" , x , "to monitor mount events." ] @@ -145,7 +147,7 @@ handleMounts st handle wasmounted nowmounted = mapM_ (handleMount st handle) $ handleMount :: ThreadState -> DaemonStatusHandle -> Mntent -> IO () handleMount st handle mntent = do - myDebug ["detected mount of", mnt_dir mntent] + debug thisThread ["detected mount of", mnt_dir mntent] type MountPoints = S.Set Mntent @@ -156,6 +158,3 @@ currentMountPoints = S.fromList <$> getMounts {- Finds new mount points, given an old and a new set. -} newMountPoints :: MountPoints -> MountPoints -> MountPoints newMountPoints old new = S.difference new old - -myDebug :: [String] -> IO () -myDebug ms = debugM "MountWatcher" $ unwords ("MountWatcher:":ms) diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 6d6836120..e5191109c 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -7,7 +7,7 @@ module Assistant.Threads.Pusher where -import Common.Annex +import Assistant.Common import Assistant.Commits import Assistant.Pushes import Assistant.DaemonStatus @@ -20,6 +20,9 @@ import Utility.Parallel import Data.Time.Clock import qualified Data.Map as M +thisThread :: ThreadName +thisThread = "Pusher" + {- This thread retries pushes that failed before. -} pushRetryThread :: ThreadState -> FailedPushMap -> IO () pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do @@ -27,6 +30,11 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) unless (null topush) $ do + debug thisThread + [ "retrying" + , show (length topush) + , "failed pushes" + ] now <- getCurrentTime pushToRemotes now st pushmap topush where @@ -46,7 +54,13 @@ pushThread st daemonstatus commitchan pushmap = do remotes <- runThreadState st $ knownRemotes <$> getDaemonStatus daemonstatus pushToRemotes now st pushmap remotes - else refillCommits commitchan commits + else do + debug thisThread + [ "delaying push of" + , show (length commits) + , "commits" + ] + refillCommits commitchan commits {- Decide if now is a good time to push to remotes. - @@ -71,11 +85,20 @@ pushToRemotes now st pushmap remotes = do go True branch g remotes where go shouldretry branch g rs = do + debug thisThread + [ "pushing to" + , show rs + ] Command.Sync.updateBranch (Command.Sync.syncBranch branch) g (succeeded, failed) <- inParallel (push g branch) rs changeFailedPushMap pushmap $ \m -> M.union (makemap failed) $ M.difference m (makemap succeeded) + unless (null failed) $ + debug thisThread + [ "failed to push to" + , show failed + ] unless (null failed || not shouldretry) $ retry branch g failed @@ -86,5 +109,6 @@ pushToRemotes now st pushmap remotes = do ( exitSuccess, exitFailure) retry branch g rs = do + debug thisThread [ "trying manual pull to resolve failed pushes" ] runThreadState st $ manualPull branch rs go False branch g rs diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index c5b99863e..09aee0797 100644 --- a/Assistant/Threads/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -9,22 +9,27 @@ module Assistant.Threads.SanityChecker ( sanityCheckerThread ) where -import Common.Annex -import qualified Git.LsFiles +import Assistant.Common import Assistant.DaemonStatus import Assistant.ThreadedMonad import Assistant.Changes import Assistant.TransferQueue +import qualified Git.LsFiles import Utility.ThreadScheduler import qualified Assistant.Threads.Watcher as Watcher import Data.Time.Clock.POSIX +thisThread :: ThreadName +thisThread = "SanityChecker" + {- This thread wakes up occasionally to make sure the tree is in good shape. -} sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO () sanityCheckerThread st status transferqueue changechan = forever $ do waitForNextCheck st status + debug thisThread ["starting sanity check"] + runThreadState st $ modifyDaemonStatus_ status $ \s -> s { sanityCheckRunning = True } @@ -38,6 +43,9 @@ sanityCheckerThread st status transferqueue changechan = forever $ do { sanityCheckRunning = False , lastSanityCheck = Just now } + + debug thisThread ["sanity check complete"] + {- Only run one check per day, from the time of the last check. -} waitForNextCheck :: ThreadState -> DaemonStatusHandle -> IO () @@ -80,5 +88,6 @@ check st status transferqueue changechan = do insanity m = runThreadState st $ warning m addsymlink file s = do insanity $ "found unstaged symlink: " ++ file - Watcher.runHandler st status transferqueue changechan + Watcher.runHandler thisThread st status + transferqueue changechan Watcher.onAddSymlink file s diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 364ce0468..be520aaf9 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -7,7 +7,7 @@ module Assistant.Threads.TransferWatcher where -import Common.Annex +import Assistant.Common import Assistant.ThreadedMonad import Assistant.DaemonStatus import Logs.Transfer @@ -16,6 +16,9 @@ import Utility.Types.DirWatcher import Data.Map as M +thisThread :: ThreadName +thisThread = "TransferWatcher" + {- This thread watches for changes to the gitAnnexTransferDir, - and updates the DaemonStatus's map of ongoing transfers. -} transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO () @@ -30,6 +33,7 @@ transferWatcherThread st dstatus = do , errHook = hook onErr } void $ watchDir dir (const False) hooks id + debug thisThread ["watching for transfers"] type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO () @@ -51,11 +55,17 @@ onErr _ _ msg _ = error msg onAdd :: Handler onAdd st dstatus file _ = case parseTransferFile file of Nothing -> noop - Just t -> runThreadState st $ go t =<< checkTransfer t + Just t -> do + runThreadState st $ go t =<< checkTransfer t where go _ Nothing = noop -- transfer already finished - go t (Just info) = adjustTransfers dstatus $ - M.insertWith' merge t info + go t (Just info) = do + liftIO $ debug thisThread + [ "transfer starting:" + , show t + ] + adjustTransfers dstatus $ + M.insertWith' merge t info -- preseve transferTid, which is not written to disk merge new old = new { transferTid = transferTid old } @@ -63,4 +73,9 @@ onAdd st dstatus file _ = case parseTransferFile file of onDel :: Handler onDel st dstatus file _ = case parseTransferFile file of Nothing -> noop - Just t -> void $ runThreadState st $ removeTransfer dstatus t + Just t -> do + debug thisThread + [ "transfer finishing:" + , show t + ] + void $ runThreadState st $ removeTransfer dstatus t diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index c439d8b7e..4ee5290e1 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -7,7 +7,7 @@ module Assistant.Threads.Transferrer where -import Common.Annex +import Assistant.Common import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.TransferQueue @@ -22,6 +22,9 @@ import Data.Time.Clock.POSIX import Data.Time.Clock import qualified Data.Map as M +thisThread :: ThreadName +thisThread = "Transferrer" + {- For now only one transfer is run at a time. -} maxTransfers :: Int maxTransfers = 1 @@ -32,8 +35,12 @@ transfererThread st dstatus transferqueue slots = go where go = do (t, info) <- getNextTransfer transferqueue - whenM (runThreadState st $ shouldTransfer dstatus t info) $ - runTransfer st dstatus slots t info + ifM (runThreadState st $ shouldTransfer dstatus t info) + ( do + debug thisThread [ "Transferring:" , show t ] + runTransfer st dstatus slots t info + , debug thisThread [ "Skipping unnecessary transfer:" , show t ] + ) go {- Checks if the requested transfer is already running, or diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index ae4fafb78..617e6d77c 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -5,9 +5,16 @@ - Licensed under the GNU GPL version 3 or higher. -} -module Assistant.Threads.Watcher where - -import Common.Annex +module Assistant.Threads.Watcher ( + watchThread, + checkCanWatch, + needLsof, + stageSymlink, + onAddSymlink, + runHandler, +) where + +import Assistant.Common import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes @@ -30,6 +37,9 @@ import Git.Types import Data.Bits.Utils import qualified Data.ByteString.Lazy as L +thisThread :: ThreadName +thisThread = "Watcher" + checkCanWatch :: Annex () checkCanWatch | canWatch = @@ -46,10 +56,12 @@ needLsof = error $ unlines ] watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO () -watchThread st dstatus transferqueue changechan = void $ watchDir "." ignored hooks startup +watchThread st dstatus transferqueue changechan = do + void $ watchDir "." ignored hooks startup + debug thisThread [ "watching", "."] where startup = statupScan st dstatus - hook a = Just $ runHandler st dstatus transferqueue changechan a + hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a hooks = WatchHooks { addHook = hook onAdd , delHook = hook onDel @@ -82,22 +94,22 @@ ignored = ig . takeFileName ig ".gitattributes" = True ig _ = False -type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change) +type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change) {- Runs an action handler, inside the Annex monad, and if there was a - change, adds it to the ChangeChan. - - Exceptions are ignored, otherwise a whole watcher thread could be crashed. -} -runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus transferqueue changechan handler file filestatus = void $ do +runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do r <- tryIO go case r of Left e -> print e Right Nothing -> noop Right (Just change) -> recordChange changechan change where - go = runThreadState st $ handler file filestatus dstatus transferqueue + go = runThreadState st $ handler threadname file filestatus dstatus transferqueue {- During initial directory scan, this will be run for any regular files - that are already checked into git. We don't want to turn those into @@ -118,7 +130,7 @@ runHandler st dstatus transferqueue changechan handler file filestatus = void $ - the add. -} onAdd :: Handler -onAdd file filestatus dstatus _ +onAdd threadname file filestatus dstatus _ | maybe False isRegularFile filestatus = do ifM (scanComplete <$> getDaemonStatus dstatus) ( go @@ -129,14 +141,16 @@ onAdd file filestatus dstatus _ ) | otherwise = noChange where - go = pendingAddChange =<< Command.Add.lockDown file + go = do + liftIO $ debug threadname ["file added", file] + pendingAddChange =<< Command.Add.lockDown file {- A symlink might be an arbitrary symlink, which is just added. - Or, if it is a git-annex symlink, ensure it points to the content - before adding it. -} onAddSymlink :: Handler -onAddSymlink file filestatus dstatus transferqueue = go =<< Backend.lookupFile file +onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file where go (Just (key, _)) = do link <- calcGitLink file key @@ -146,6 +160,7 @@ onAddSymlink file filestatus dstatus transferqueue = go =<< Backend.lookupFile f checkcontent key s ensurestaged link s , do + liftIO $ debug threadname ["fix symlink", file] liftIO $ removeFile file liftIO $ createSymbolicLink link file addlink link @@ -175,6 +190,7 @@ onAddSymlink file filestatus dstatus transferqueue = go =<< Backend.lookupFile f {- For speed, tries to reuse the existing blob for - the symlink target. -} addlink link = do + liftIO $ debug threadname ["add symlink", file] v <- catObjectDetails $ Ref $ ':':file case v of Just (currlink, sha) @@ -195,7 +211,8 @@ onAddSymlink file filestatus dstatus transferqueue = go =<< Backend.lookupFile f | otherwise = noop onDel :: Handler -onDel file _ _dstatus _ = do +onDel threadname file _ _dstatus _ = do + liftIO $ debug threadname ["file deleted", file] Annex.Queue.addUpdateIndex =<< inRepo (Git.UpdateIndex.unstageFile file) madeChange file RmChange @@ -208,14 +225,15 @@ onDel file _ _dstatus _ = do - command to get the recursive list of files in the directory, so rm is - just as good. -} onDelDir :: Handler -onDelDir dir _ _dstatus _ = do +onDelDir threadname dir _ _dstatus _ = do + liftIO $ debug threadname ["directory deleted", dir] Annex.Queue.addCommand "rm" [Params "--quiet -r --cached --ignore-unmatch --"] [dir] madeChange dir RmDirChange {- Called when there's an error with inotify. -} onErr :: Handler -onErr msg _ _dstatus _ = do +onErr _ msg _ _dstatus _ = do warning msg return Nothing @@ -17,6 +17,9 @@ module Option ( import System.Console.GetOpt import System.Log.Logger +import System.Log.Formatter +import System.Log.Handler (setFormatter, LogHandler) +import System.Log.Handler.Simple import Common.Annex import qualified Annex @@ -48,8 +51,13 @@ common = setfast v = Annex.changeState $ \s -> s { Annex.fast = v } setauto v = Annex.changeState $ \s -> s { Annex.auto = v } setforcebackend v = Annex.changeState $ \s -> s { Annex.forcebackend = Just v } - setdebug = liftIO $ updateGlobalLogger rootLoggerName $ - setLevel DEBUG + setdebug = liftIO $ do + s <- simpledebug + updateGlobalLogger rootLoggerName + (setLevel DEBUG . setHandlers [s]) + simpledebug = setFormatter + <$> streamHandler stderr DEBUG + <*> pure (simpleLogFormatter "[$time] $msg") matcher :: [Option] matcher = |