diff options
-rw-r--r-- | Assistant.hs | 11 | ||||
-rw-r--r-- | Assistant/Common.hs | 24 | ||||
-rw-r--r-- | Assistant/DaemonStatus.hs | 19 | ||||
-rw-r--r-- | Assistant/Sync.hs | 2 | ||||
-rw-r--r-- | Assistant/Threads/Committer.hs | 5 | ||||
-rw-r--r-- | Assistant/Threads/DaemonStatus.hs | 36 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 27 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 20 | ||||
-rw-r--r-- | Assistant/Threads/SanityChecker.hs | 5 | ||||
-rw-r--r-- | Assistant/Threads/TransferPoller.hs | 5 | ||||
-rw-r--r-- | Assistant/Threads/TransferScanner.hs | 5 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 5 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 4 | ||||
-rw-r--r-- | Assistant/Threads/WebApp.hs | 5 | ||||
-rw-r--r-- | Command/WebApp.hs | 6 |
18 files changed, 133 insertions, 64 deletions
diff --git a/Assistant.hs b/Assistant.hs index 1488a3b82..7f38fdf25 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -110,6 +110,7 @@ import Assistant.Pushes import Assistant.ScanRemotes import Assistant.TransferQueue import Assistant.TransferSlots +import Assistant.Threads.DaemonStatus import Assistant.Threads.Watcher import Assistant.Threads.Committer import Assistant.Threads.Pusher @@ -132,6 +133,8 @@ import Utility.ThreadScheduler import Control.Concurrent +type NamedThread = IO () -> IO (String, IO ()) + stopDaemon :: Annex () stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile @@ -162,7 +165,7 @@ startAssistant assistant daemonize webappwaiter = do transferqueue <- newTransferQueue transferslots <- newTransferSlots scanremotes <- newScanRemoteMap - mapM_ startthread + mapM_ (startthread dstatus) [ watch $ commitThread st changechan commitchan transferqueue dstatus #ifdef WITH_WEBAPP , assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots Nothing webappwaiter @@ -177,12 +180,14 @@ startAssistant assistant daemonize webappwaiter = do , assist $ sanityCheckerThread st dstatus transferqueue changechan , assist $ mountWatcherThread st dstatus scanremotes , assist $ netWatcherThread st dstatus scanremotes + , assist $ netWatcherFallbackThread st dstatus scanremotes , assist $ transferScannerThread st dstatus scanremotes transferqueue , watch $ watchThread st dstatus transferqueue changechan ] waitForTermination watch a = (True, a) assist a = (False, a) - startthread (watcher, a) - | watcher || assistant = void $ forkIO a + startthread dstatus (watcher, t) + | watcher || assistant = void $ forkIO $ + runNamedThread dstatus t | otherwise = noop diff --git a/Assistant/Common.hs b/Assistant/Common.hs index c1a346e75..d6df77f69 100644 --- a/Assistant/Common.hs +++ b/Assistant/Common.hs @@ -8,14 +8,38 @@ module Assistant.Common ( module X, ThreadName, + NamedThread(..), + runNamedThread, debug ) where import Common.Annex as X +import Assistant.DaemonStatus +import Assistant.Alert import System.Log.Logger +import qualified Control.Exception as E type ThreadName = String +data NamedThread = NamedThread ThreadName (IO ()) debug :: ThreadName -> [String] -> IO () debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws + +runNamedThread :: DaemonStatusHandle -> NamedThread -> IO () +runNamedThread dstatus (NamedThread name a) = go + where + go = do + r <- E.try a :: IO (Either E.SomeException ()) + case r of + Right _ -> noop + Left e -> do + let msg = unwords + [ name + , "crashed:" + , show e + ] + hPutStrLn stderr msg + -- TODO click to restart + void $ addAlert dstatus $ + warningAlert name msg diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index b5dd08d0e..c6942530c 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -8,9 +8,7 @@ module Assistant.DaemonStatus where import Common.Annex -import Assistant.ThreadedMonad import Assistant.Alert -import Utility.ThreadScheduler import Utility.TempFile import Utility.NotificationBroadcaster import Logs.Transfer @@ -114,23 +112,6 @@ startDaemonStatus = do , knownRemotes = remotes } -{- This writes the daemon status to disk, when it changes, but no more - - frequently than once every ten minutes. - -} -daemonStatusThread :: ThreadState -> DaemonStatusHandle -> IO () -daemonStatusThread st dstatus = do - notifier <- newNotificationHandle - =<< changeNotifier <$> getDaemonStatus dstatus - checkpoint - runEvery (Seconds tenMinutes) $ do - waitNotification notifier - checkpoint - where - checkpoint = do - status <- getDaemonStatus dstatus - file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile - writeDaemonStatusFile file status - {- Don't just dump out the structure, because it will change over time, - and parts of it are not relevant. -} writeDaemonStatusFile :: FilePath -> DaemonStatus -> IO () diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 9ba7f406f..42863f858 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -42,7 +42,7 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ addScanRemotes scanremotes diverged rs return ok where - (gitremotes, specialremotes) = + (gitremotes, _specialremotes) = partition (Git.repoIsUrl . Remote.repo) rs sync (Just branch) = do diverged <- manualPull st (Just branch) gitremotes diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index 5aadcc02a..fcf205311 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -36,8 +36,8 @@ thisThread :: ThreadName thisThread = "Committer" {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO () -commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do +commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread +commitThread st changechan commitchan transferqueue dstatus = thread $ runEvery (Seconds 1) $ do -- We already waited one second as a simple rate limiter. -- Next, wait until at least one change is available for -- processing. @@ -61,6 +61,7 @@ commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds else refill readychanges else refill changes where + thread = NamedThread thisThread refill [] = noop refill cs = do debug thisThread diff --git a/Assistant/Threads/DaemonStatus.hs b/Assistant/Threads/DaemonStatus.hs new file mode 100644 index 000000000..f3174c86f --- /dev/null +++ b/Assistant/Threads/DaemonStatus.hs @@ -0,0 +1,36 @@ +{- git-annex assistant daemon status thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.DaemonStatus where + +import Assistant.Common +import Assistant.DaemonStatus +import Assistant.ThreadedMonad +import Utility.ThreadScheduler +import Utility.NotificationBroadcaster + +thisThread :: ThreadName +thisThread = "DaemonStatus" + +{- This writes the daemon status to disk, when it changes, but no more + - frequently than once every ten minutes. + -} +daemonStatusThread :: ThreadState -> DaemonStatusHandle -> NamedThread +daemonStatusThread st dstatus = thread $ do + notifier <- newNotificationHandle + =<< changeNotifier <$> getDaemonStatus dstatus + checkpoint + runEvery (Seconds tenMinutes) $ do + waitNotification notifier + checkpoint + where + thread = NamedThread thisThread + checkpoint = do + status <- getDaemonStatus dstatus + file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile + writeDaemonStatusFile file status + diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index 0f33b68ed..663ea40ff 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -22,8 +22,8 @@ thisThread = "Merger" {- This thread watches for changes to .git/refs/heads/synced/, - which indicate incoming pushes. It merges those pushes into the - currently checked out branch. -} -mergeThread :: ThreadState -> IO () -mergeThread st = do +mergeThread :: ThreadState -> NamedThread +mergeThread st = thread $ do g <- runThreadState st $ fromRepo id let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced" createDirectoryIfMissing True dir @@ -34,6 +34,8 @@ mergeThread st = do } void $ watchDir dir (const False) hooks id debug thisThread ["watching", dir] + where + thread = NamedThread thisThread type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO () diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index cc7495602..e52dc9093 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -38,13 +38,15 @@ import Data.Word (Word32) thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -mountWatcherThread st handle scanremotes = +mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread +mountWatcherThread st handle scanremotes = thread $ #if WITH_DBUS dbusThread st handle scanremotes #else pollingThread st handle scanremotes #endif + where + thread = NamedThread thisThread #if WITH_DBUS diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index 6f6bd40da..b52657990 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -15,13 +15,11 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync -import qualified Git import Utility.ThreadScheduler import Remote.List import qualified Types.Remote as Remote import qualified Control.Exception as E -import Control.Concurrent #if WITH_DBUS import Utility.DBus @@ -35,18 +33,27 @@ import Data.Word (Word32) thisThread :: ThreadName thisThread = "NetWatcher" -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -netWatcherThread st dstatus scanremotes = do +netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread +netWatcherThread st dstatus scanremotes = thread $ do #if WITH_DBUS - void $ forkIO $ dbusThread st dstatus scanremotes + dbusThread st dstatus scanremotes +#else + noop #endif - {- This is a fallback for when dbus cannot be used to detect - - network connection changes, but it also ensures that - - any networked remotes that may have not been routable for a - - while (despite the local network staying up), are synced with - - periodically. -} + where + thread = NamedThread thisThread + +{- This is a fallback for when dbus cannot be used to detect + - network connection changes, but it also ensures that + - any networked remotes that may have not been routable for a + - while (despite the local network staying up), are synced with + - periodically. -} +netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread +netWatcherFallbackThread st dstatus scanremotes = thread $ do runEvery (Seconds 3600) $ handleConnection st dstatus scanremotes + where + thread = NamedThread thisThread #if WITH_DBUS diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 6bf8de2df..dbe968cd7 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -24,8 +24,8 @@ thisThread :: ThreadName thisThread = "Pusher" {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> IO () -pushRetryThread st dstatus pushmap = runEvery (Seconds halfhour) $ do +pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread +pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) @@ -40,10 +40,11 @@ pushRetryThread st dstatus pushmap = runEvery (Seconds halfhour) $ do pushToRemotes thisThread now st (Just pushmap) topush where halfhour = 1800 + thread = NamedThread thisThread {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO () -pushThread st dstatus commitchan pushmap = do +pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread +pushThread st dstatus commitchan pushmap = thread $ do runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made @@ -64,11 +65,12 @@ pushThread st dstatus commitchan pushmap = do , "commits" ] refillCommits commitchan commits - where - pushable r - | Remote.specialRemote r = False - | Remote.readonly r = False - | otherwise = True + where + thread = NamedThread thisThread + pushable r + | Remote.specialRemote r = False + | Remote.readonly r = False + | otherwise = True {- Decide if now is a good time to push to remotes. - diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index a7c2189d8..148ae1435 100644 --- a/Assistant/Threads/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -25,8 +25,8 @@ thisThread :: ThreadName thisThread = "SanityChecker" {- This thread wakes up occasionally to make sure the tree is in good shape. -} -sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO () -sanityCheckerThread st dstatus transferqueue changechan = forever $ do +sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread +sanityCheckerThread st dstatus transferqueue changechan = thread $ forever $ do waitForNextCheck dstatus debug thisThread ["starting sanity check"] @@ -35,6 +35,7 @@ sanityCheckerThread st dstatus transferqueue changechan = forever $ do debug thisThread ["sanity check complete"] where + thread = NamedThread thisThread go = do modifyDaemonStatus_ dstatus $ \s -> s { sanityCheckRunning = True } diff --git a/Assistant/Threads/TransferPoller.hs b/Assistant/Threads/TransferPoller.hs index 79bcb98b5..e31dfb40c 100644 --- a/Assistant/Threads/TransferPoller.hs +++ b/Assistant/Threads/TransferPoller.hs @@ -21,8 +21,8 @@ thisThread = "TransferPoller" {- This thread polls the status of ongoing transfers, determining how much - of each transfer is complete. -} -transferPollerThread :: ThreadState -> DaemonStatusHandle -> IO () -transferPollerThread st dstatus = do +transferPollerThread :: ThreadState -> DaemonStatusHandle -> NamedThread +transferPollerThread st dstatus = thread $ do g <- runThreadState st $ fromRepo id tn <- newNotificationHandle =<< transferNotifier <$> getDaemonStatus dstatus @@ -33,6 +33,7 @@ transferPollerThread st dstatus = do then waitNotification tn -- block until transfers running else mapM_ (poll g) $ M.toList ts where + thread = NamedThread thisThread poll g (t, info) {- Downloads are polled by checking the size of the - temp file being used for the transfer. -} diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index 87af3567e..ecb9021f0 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -31,11 +31,12 @@ thisThread = "TransferScanner" {- This thread waits until a remote needs to be scanned, to find transfers - that need to be made, to keep data in sync. -} -transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO () -transferScannerThread st dstatus scanremotes transferqueue = do +transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread +transferScannerThread st dstatus scanremotes transferqueue = thread $ do startupScan go S.empty where + thread = NamedThread thisThread go scanned = do threadDelaySeconds (Seconds 2) (rs, infos) <- unzip <$> getScanRemote scanremotes diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index fe8af9aad..be15aef24 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -20,8 +20,8 @@ thisThread = "TransferWatcher" {- This thread watches for changes to the gitAnnexTransferDir, - and updates the DaemonStatus's map of ongoing transfers. -} -transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO () -transferWatcherThread st dstatus = do +transferWatcherThread :: ThreadState -> DaemonStatusHandle -> NamedThread +transferWatcherThread st dstatus = thread $ do g <- runThreadState st $ fromRepo id let dir = gitAnnexTransferDir g createDirectoryIfMissing True dir @@ -33,6 +33,8 @@ transferWatcherThread st dstatus = do } void $ watchDir dir (const False) hooks id debug thisThread ["watching for transfers"] + where + thread = NamedThread thisThread type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO () diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index a2cf08c1c..3c1c44b9d 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -30,9 +30,10 @@ maxTransfers :: Int maxTransfers = 1 {- Dispatches transfers from the queue. -} -transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO () -transfererThread st dstatus transferqueue slots = go =<< readProgramFile +transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> NamedThread +transfererThread st dstatus transferqueue slots = thread $ go =<< readProgramFile where + thread = NamedThread thisThread go program = forever $ inTransferSlot dstatus slots $ maybe (return Nothing) (uncurry $ startTransfer st dstatus program) =<< getNextTransfer transferqueue dstatus notrunning diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 89cc98fa4..3054c50e1 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -56,8 +56,8 @@ needLsof = error $ unlines , "Be warned: This can corrupt data in the annex, and make fsck complain." ] -watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO () -watchThread st dstatus transferqueue changechan = do +watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread +watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do void $ watchDir "." ignored hooks startup debug thisThread [ "watching", "."] where diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs index 7da3c82e5..73a4467cd 100644 --- a/Assistant/Threads/WebApp.hs +++ b/Assistant/Threads/WebApp.hs @@ -50,8 +50,8 @@ webAppThread -> TransferSlots -> Maybe (IO String) -> Maybe (Url -> FilePath -> IO ()) - -> IO () -webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun onstartup = do + -> NamedThread +webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun onstartup = thread $ do webapp <- WebApp <$> pure mst <*> pure dstatus @@ -72,6 +72,7 @@ webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun on Nothing -> withTempFile "webapp.html" $ \tmpfile _ -> go port webapp tmpfile Just st -> go port webapp =<< runThreadState st (fromRepo gitAnnexHtmlShim) where + thread = NamedThread thisThread getreldir Nothing = return Nothing getreldir (Just st) = Just <$> (relHome =<< absPath diff --git a/Command/WebApp.hs b/Command/WebApp.hs index c8a7c7f59..9a3be19d1 100644 --- a/Command/WebApp.hs +++ b/Command/WebApp.hs @@ -10,6 +10,7 @@ module Command.WebApp where import Common.Annex import Command import Assistant +import Assistant.Common import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.TransferQueue @@ -93,8 +94,9 @@ firstRun = do transferslots <- newTransferSlots v <- newEmptyMVar let callback a = Just $ a v - webAppThread Nothing dstatus scanremotes transferqueue transferslots - (callback signaler) (callback mainthread) + void $ runNamedThread dstatus $ + webAppThread Nothing dstatus scanremotes transferqueue transferslots + (callback signaler) (callback mainthread) where signaler v = do putMVar v "" |