diff options
author | Joey Hess <joey@kitenet.net> | 2012-10-29 02:21:04 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-10-29 02:21:04 -0400 |
commit | 579f63b6b756ca51b8f9fe53c3e668500718d91f (patch) | |
tree | 20039581df67e034ef434749d37de41e9802d21d /Assistant | |
parent | 040f68d628120e112e22bfb7100f9650dec940c8 (diff) |
Assistant monad, stage 2.5
Converted several threads to run in the monad.
Added a lot of useful combinators for working with the monad.
Now the monad includes the name of the thread.
Some debugging messages are disabled pending converting other threads.
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/Common.hs | 44 | ||||
-rw-r--r-- | Assistant/DaemonStatus.hs | 4 | ||||
-rw-r--r-- | Assistant/Monad.hs | 40 | ||||
-rw-r--r-- | Assistant/Sync.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/Committer.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/ConfigMonitor.hs | 4 | ||||
-rw-r--r-- | Assistant/Threads/DaemonStatus.hs | 27 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 8 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 132 | ||||
-rw-r--r-- | Assistant/Threads/PairListener.hs | 2 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 12 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 8 | ||||
-rw-r--r-- | Assistant/Threads/SanityChecker.hs | 114 | ||||
-rw-r--r-- | Assistant/Threads/TransferPoller.hs | 73 | ||||
-rw-r--r-- | Assistant/Threads/TransferScanner.hs | 8 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 13 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 12 | ||||
-rw-r--r-- | Assistant/Threads/WebApp.hs | 4 |
20 files changed, 273 insertions, 256 deletions
diff --git a/Assistant/Common.hs b/Assistant/Common.hs index a6c6b8935..b46d3342a 100644 --- a/Assistant/Common.hs +++ b/Assistant/Common.hs @@ -10,7 +10,8 @@ module Assistant.Common ( ThreadName, NamedThread(..), runNamedThread, - debug + debug, + brokendebug ) where import Common.Annex as X @@ -22,25 +23,28 @@ import System.Log.Logger import qualified Control.Exception as E type ThreadName = String -data NamedThread = NamedThread ThreadName (IO ()) +data NamedThread = NamedThread ThreadName (Assistant ()) -debug :: ThreadName -> [String] -> IO () -debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws +brokendebug :: ThreadName -> [String] -> IO () +brokendebug _ _ = noop -- TODO remove this + +debug :: [String] -> Assistant () +debug ws = do + name <- getAssistant threadName + liftIO $ debugM name $ unwords $ (name ++ ":") : ws runNamedThread :: NamedThread -> Assistant () -runNamedThread (NamedThread name a) = liftIO . go =<< getAssistant daemonStatus - where - go dstatus = do - r <- E.try a :: IO (Either E.SomeException ()) - case r of - Right _ -> noop - Left e -> do - let msg = unwords - [ name - , "crashed:" - , show e - ] - hPutStrLn stderr msg - -- TODO click to restart - void $ addAlert dstatus $ - warningAlert name msg +runNamedThread (NamedThread name a) = do + d <- getAssistant id + liftIO . go $ d { threadName = name } + where + go d = do + r <- E.try (runAssistant a d) :: IO (Either E.SomeException ()) + case r of + Right _ -> noop + Left e -> do + let msg = unwords [name, "crashed:", show e] + hPutStrLn stderr msg + -- TODO click to restart + void $ addAlert (daemonStatusHandle d) $ + warningAlert name msg diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index 60b560b90..08cdbaf55 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -181,8 +181,8 @@ adjustTransfersSTM dstatus a = do putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) } {- Alters a transfer's info, if the transfer is in the map. -} -alterTransferInfo :: DaemonStatusHandle -> Transfer -> (TransferInfo -> TransferInfo) -> IO () -alterTransferInfo dstatus t a = updateTransferInfo' dstatus $ M.adjust a t +alterTransferInfo :: Transfer -> (TransferInfo -> TransferInfo) -> DaemonStatusHandle -> IO () +alterTransferInfo t a dstatus = updateTransferInfo' dstatus $ M.adjust a t {- Updates a transfer's info. Adds the transfer to the map if necessary, - or if already present, updates it while preserving the old transferTid, diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index fa982b45e..ef9e7a4cb 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -13,7 +13,12 @@ module Assistant.Monad ( newAssistantData, runAssistant, getAssistant, - liftAnnex + liftAnnex, + (<~>), + (<<~), + daemonStatus, + asIO, + asIO2, ) where import "mtl" Control.Monad.Reader @@ -43,8 +48,9 @@ instance MonadBase IO Assistant where liftBase = Assistant . liftBase data AssistantData = AssistantData - { threadState :: ThreadState - , daemonStatus :: DaemonStatusHandle + { threadName :: String + , threadState :: ThreadState + , daemonStatusHandle :: DaemonStatusHandle , scanRemoteMap :: ScanRemoteMap , transferQueue :: TransferQueue , transferSlots :: TransferSlots @@ -57,7 +63,8 @@ data AssistantData = AssistantData newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData newAssistantData st dstatus = AssistantData - <$> pure st + <$> pure "main" + <*> pure st <*> pure dstatus <*> newScanRemoteMap <*> newTransferQueue @@ -81,3 +88,28 @@ liftAnnex :: Annex a -> Assistant a liftAnnex a = do st <- reader threadState liftIO $ runThreadState st a + +{- Runs an IO action, passing it an IO action that runs an Assistant action. -} +(<~>) :: (IO a -> IO b) -> Assistant a -> Assistant b +io <~> a = do + d <- reader id + liftIO $ io $ runAssistant a d + +{- Creates an IO action that will run an Assistant action when run. -} +asIO :: (a -> Assistant b) -> Assistant (a -> IO b) +asIO a = do + d <- reader id + return $ \v -> runAssistant (a v) d + +{- Creates an IO action that will run an Assistant action when run. -} +asIO2 :: (a -> b -> Assistant c) -> Assistant (a -> b -> IO c) +asIO2 a = do + d <- reader id + return $ \v1 v2 -> runAssistant (a v1 v2) d + +{- Runs an IO action on a selected field of the AssistantData. -} +(<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b +io <<~ v = reader v >>= liftIO . io + +daemonStatus :: Assistant DaemonStatus +daemonStatus = getDaemonStatus <<~ daemonStatusHandle diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 8a9cf8985..bd23c7bb4 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -93,7 +93,7 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do where go _ Nothing _ _ _ = return True -- no branch, so nothing to do go shouldretry (Just branch) g u rs = do - debug threadname + brokendebug threadname [ "pushing to" , show rs ] @@ -117,12 +117,12 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do makemap l = M.fromList $ zip l (repeat now) retry branch g u rs = do - debug threadname [ "trying manual pull to resolve failed pushes" ] + brokendebug threadname [ "trying manual pull to resolve failed pushes" ] void $ manualPull st (Just branch) rs go False (Just branch) g u rs fallback branch g u rs = do - debug threadname + brokendebug threadname [ "fallback pushing to" , show rs ] diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index ceb885100..11c6dc584 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -42,7 +42,7 @@ thisThread = "Committer" {- This thread makes git commits at appropriate times. -} commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread -commitThread st changechan commitchan transferqueue dstatus = thread $ do +commitThread st changechan commitchan transferqueue dstatus = thread $ liftIO $ do delayadd <- runThreadState st $ maybe delayaddDefault (Just . Seconds) . readish <$> getConfig (annexConfig "delayadd") "" @@ -58,7 +58,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes if shouldCommit time readychanges then do - debug thisThread + brokendebug thisThread [ "committing" , show (length readychanges) , "changes" @@ -72,7 +72,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do thread = NamedThread thisThread refill [] = noop refill cs = do - debug thisThread + brokendebug thisThread [ "delaying commit of" , show (length cs) , "changes" diff --git a/Assistant/Threads/ConfigMonitor.hs b/Assistant/Threads/ConfigMonitor.hs index 7450d5c6b..2d5df48dd 100644 --- a/Assistant/Threads/ConfigMonitor.hs +++ b/Assistant/Threads/ConfigMonitor.hs @@ -38,7 +38,7 @@ thisThread = "ConfigMonitor" - be detected immediately. -} configMonitorThread :: ThreadState -> DaemonStatusHandle -> BranchChangeHandle -> CommitChan -> NamedThread -configMonitorThread st dstatus branchhandle commitchan = thread $ do +configMonitorThread st dstatus branchhandle commitchan = thread $ liftIO $ do r <- runThreadState st Annex.gitRepo go r =<< getConfigs r where @@ -50,7 +50,7 @@ configMonitorThread st dstatus branchhandle commitchan = thread $ do new <- getConfigs r when (old /= new) $ do let changedconfigs = new `S.difference` old - debug thisThread $ "reloading config" : + brokendebug thisThread $ "reloading config" : map fst (S.toList changedconfigs) reloadConfigs st dstatus changedconfigs {- Record a commit to get this config diff --git a/Assistant/Threads/DaemonStatus.hs b/Assistant/Threads/DaemonStatus.hs index f3174c86f..946bf1b05 100644 --- a/Assistant/Threads/DaemonStatus.hs +++ b/Assistant/Threads/DaemonStatus.hs @@ -9,28 +9,21 @@ module Assistant.Threads.DaemonStatus where import Assistant.Common import Assistant.DaemonStatus -import Assistant.ThreadedMonad import Utility.ThreadScheduler import Utility.NotificationBroadcaster -thisThread :: ThreadName -thisThread = "DaemonStatus" - {- This writes the daemon status to disk, when it changes, but no more - frequently than once every ten minutes. -} -daemonStatusThread :: ThreadState -> DaemonStatusHandle -> NamedThread -daemonStatusThread st dstatus = thread $ do - notifier <- newNotificationHandle - =<< changeNotifier <$> getDaemonStatus dstatus +daemonStatusThread :: NamedThread +daemonStatusThread = NamedThread "DaemonStatus" $ do + notifier <- liftIO . newNotificationHandle + =<< changeNotifier <$> daemonStatus checkpoint - runEvery (Seconds tenMinutes) $ do - waitNotification notifier + runEvery (Seconds tenMinutes) <~> do + liftIO $ waitNotification notifier checkpoint - where - thread = NamedThread thisThread - checkpoint = do - status <- getDaemonStatus dstatus - file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile - writeDaemonStatusFile file status - + where + checkpoint = do + file <- liftAnnex $ fromRepo gitAnnexDaemonStatusFile + liftIO . writeDaemonStatusFile file =<< daemonStatus diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index e415a7562..152b40361 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -25,7 +25,7 @@ thisThread = "Merger" {- This thread watches for changes to .git/refs/, and handles incoming - pushes. -} mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> NamedThread -mergeThread st dstatus transferqueue branchchange = thread $ do +mergeThread st dstatus transferqueue branchchange = thread $ liftIO $ do g <- runThreadState st gitRepo let dir = Git.localGitDir g </> "refs" createDirectoryIfMissing True dir @@ -35,7 +35,7 @@ mergeThread st dstatus transferqueue branchchange = thread $ do , errHook = hook onErr } void $ watchDir dir (const False) hooks id - debug thisThread ["watching", dir] + brokendebug thisThread ["watching", dir] where thread = NamedThread thisThread @@ -81,7 +81,7 @@ onAdd st dstatus transferqueue branchchange file _ changedbranch = fileToBranch file mergecurrent (Just current) | equivBranches changedbranch current = do - liftIO $ debug thisThread + liftIO $ brokendebug thisThread [ "merging" , show changedbranch , "into" diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index afd1c223c..f36bb8874 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -40,7 +40,7 @@ thisThread :: ThreadName thisThread = "MountWatcher" mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread -mountWatcherThread st handle scanremotes pushnotifier = thread $ +mountWatcherThread st handle scanremotes pushnotifier = thread $ liftIO $ #if WITH_DBUS dbusThread st handle scanremotes pushnotifier #else @@ -93,7 +93,7 @@ checkMountMonitor client = do case running of [] -> startOneService client startableservices (service:_) -> do - debug thisThread [ "Using running DBUS service" + brokendebug thisThread [ "Using running DBUS service" , service , "to monitor mount events." ] @@ -111,7 +111,7 @@ startOneService client (x:xs) = do [toVariant x, toVariant (0 :: Word32)] ifM (elem x <$> listServiceNames client) ( do - debug thisThread [ "Started DBUS service" + brokendebug thisThread [ "Started DBUS service" , x , "to monitor mount events." ] @@ -160,7 +160,7 @@ handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () handleMount st dstatus scanremotes pushnotifier dir = do - debug thisThread ["detected mount of", dir] + brokendebug thisThread ["detected mount of", dir] reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder st dstatus dir diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index ed64541c3..2af880e02 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -11,9 +11,6 @@ module Assistant.Threads.NetWatcher where import Assistant.Common -import Assistant.ThreadedMonad -import Assistant.DaemonStatus -import Assistant.ScanRemotes import Assistant.Sync import Assistant.Pushes import Utility.ThreadScheduler @@ -29,72 +26,67 @@ import Data.Word (Word32) #warning Building without dbus support; will poll for network connection changes #endif -thisThread :: ThreadName -thisThread = "NetWatcher" - -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +netWatcherThread :: NamedThread #if WITH_DBUS -netWatcherThread st dstatus scanremotes pushnotifier = thread $ - dbusThread st dstatus scanremotes pushnotifier +netWatcherThread = thread dbusThread #else -netWatcherThread _ _ _ _ = thread noop +netWatcherThread = thread noop #endif - where - thread = NamedThread thisThread + where + thread = NamedThread "NetWatcher" {- This is a fallback for when dbus cannot be used to detect - network connection changes, but it also ensures that - any networked remotes that may have not been routable for a - while (despite the local network staying up), are synced with - periodically. -} -netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread -netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $ - runEvery (Seconds 3600) $ - handleConnection st dstatus scanremotes pushnotifier - where - thread = NamedThread thisThread +netWatcherFallbackThread :: NamedThread +netWatcherFallbackThread = NamedThread "NetWatcherFallback" $ + runEvery (Seconds 3600) <~> handleConnection #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -dbusThread st dstatus scanremotes pushnotifier = - persistentClient getSystemAddress () onerr go - where - go client = ifM (checkNetMonitor client) - ( do - listenNMConnections client handleconn - listenWicdConnections client handleconn - , do - runThreadState st $ - warning "No known network monitor available through dbus; falling back to polling" - ) - handleconn = do - debug thisThread ["detected network connection"] - notifyRestart pushnotifier - handleConnection st dstatus scanremotes pushnotifier - onerr e _ = do - runThreadState st $ - warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" - {- Wait, in hope that dbus will come back -} - threadDelaySeconds (Seconds 60) +dbusThread :: Assistant () +dbusThread = do + handleerr <- asIO2 onerr + runclient <- asIO go + liftIO $ persistentClient getSystemAddress () handleerr runclient + where + go client = ifM (checkNetMonitor client) + ( do + listenNMConnections client <~> handleconn + listenWicdConnections client <~> handleconn + , do + liftAnnex $ + warning "No known network monitor available through dbus; falling back to polling" + ) + handleconn = do + debug ["detected network connection"] + notifyRestart <<~ pushNotifier + handleConnection + onerr e _ = do + liftAnnex $ + warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" + {- Wait, in hope that dbus will come back -} + liftIO $ threadDelaySeconds (Seconds 60) {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor network connections. -} -checkNetMonitor :: Client -> IO Bool +checkNetMonitor :: Client -> Assistant Bool checkNetMonitor client = do - running <- filter (`elem` [networkmanager, wicd]) + running <- liftIO $ filter (`elem` [networkmanager, wicd]) <$> listServiceNames client case running of [] -> return False (service:_) -> do - debug thisThread [ "Using running DBUS service" + debug [ "Using running DBUS service" , service , "to monitor network connection events." ] return True - where - networkmanager = "org.freedesktop.NetworkManager" - wicd = "org.wicd.daemon" + where + networkmanager = "org.freedesktop.NetworkManager" + wicd = "org.wicd.daemon" {- Listens for new NetworkManager connections. -} listenNMConnections :: Client -> IO () -> IO () @@ -102,18 +94,18 @@ listenNMConnections client callback = listen client matcher $ \event -> when (Just True == anyM activeconnection (signalBody event)) $ callback - where - matcher = matchAny - { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active" - , matchMember = Just "PropertiesChanged" - } - nm_connection_activated = toVariant (2 :: Word32) - nm_state_key = toVariant ("State" :: String) - activeconnection v = do - m <- fromVariant v - vstate <- lookup nm_state_key $ dictionaryItems m - state <- fromVariant vstate - return $ state == nm_connection_activated + where + matcher = matchAny + { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active" + , matchMember = Just "PropertiesChanged" + } + nm_connection_activated = toVariant (2 :: Word32) + nm_state_key = toVariant ("State" :: String) + activeconnection v = do + m <- fromVariant v + vstate <- lookup nm_state_key $ dictionaryItems m + state <- fromVariant vstate + return $ state == nm_connection_activated {- Listens for new Wicd connections. -} listenWicdConnections :: Client -> IO () -> IO () @@ -121,21 +113,23 @@ listenWicdConnections client callback = listen client matcher $ \event -> when (any (== wicd_success) (signalBody event)) $ callback - where - matcher = matchAny - { matchInterface = Just "org.wicd.daemon" - , matchMember = Just "ConnectResultsSent" - } - wicd_success = toVariant ("success" :: String) + where + matcher = matchAny + { matchInterface = Just "org.wicd.daemon" + , matchMember = Just "ConnectResultsSent" + } + wicd_success = toVariant ("success" :: String) #endif -handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -handleConnection st dstatus scanremotes pushnotifier = - reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) - =<< networkRemotes st +handleConnection :: Assistant () +handleConnection = do + d <- getAssistant id + liftIO . reconnectRemotes (threadName d) (threadState d) + (daemonStatusHandle d) (scanRemoteMap d) (Just $ pushNotifier d) + =<< networkRemotes {- Finds network remotes. -} -networkRemotes :: ThreadState -> IO [Remote] -networkRemotes st = runThreadState st $ +networkRemotes :: Assistant [Remote] +networkRemotes = liftAnnex $ filter (isNothing . Remote.localpath) <$> remoteList diff --git a/Assistant/Threads/PairListener.hs b/Assistant/Threads/PairListener.hs index 9875dcb8a..116cc0fa1 100644 --- a/Assistant/Threads/PairListener.hs +++ b/Assistant/Threads/PairListener.hs @@ -28,7 +28,7 @@ thisThread :: ThreadName thisThread = "PairListener" pairListenerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> UrlRenderer -> NamedThread -pairListenerThread st dstatus scanremotes urlrenderer = thread $ withSocketsDo $ +pairListenerThread st dstatus scanremotes urlrenderer = thread $ liftIO $ withSocketsDo $ runEvery (Seconds 1) $ void $ tryIO $ do sock <- getsock go sock [] [] diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs index dc7099e3d..591c8b18b 100644 --- a/Assistant/Threads/PushNotifier.hs +++ b/Assistant/Threads/PushNotifier.hs @@ -35,7 +35,7 @@ controllerThread pushnotifier a = forever $ do killThread tid pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread -pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ +pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $ controllerThread pushnotifier $ do v <- runThreadState st $ getXMPPCreds case v of @@ -45,7 +45,7 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ loop c starttime = do void $ connectXMPP c $ \jid -> do fulljid <- bindJID jid - liftIO $ debug thisThread ["XMPP connected", show fulljid] + liftIO $ brokendebug thisThread ["XMPP connected", show fulljid] putStanza $ gitAnnexPresence gitAnnexSignature s <- getSession _ <- liftIO $ forkIO $ void $ runXMPP s $ @@ -54,10 +54,10 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ now <- getCurrentTime if diffUTCTime now starttime > 300 then do - debug thisThread ["XMPP connection lost; reconnecting"] + brokendebug thisThread ["XMPP connection lost; reconnecting"] loop c now else do - debug thisThread ["XMPP connection failed; will retry"] + brokendebug thisThread ["XMPP connection failed; will retry"] threadDelaySeconds (Seconds 300) loop c =<< getCurrentTime @@ -67,7 +67,7 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ receivenotifications = forever $ do s <- getStanza - liftIO $ debug thisThread ["received XMPP:", show s] + liftIO $ brokendebug thisThread ["received XMPP:", show s] case s of ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> liftIO $ pull st dstatus $ @@ -93,7 +93,7 @@ pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO () pull _ _ [] = noop pull st dstatus us = do rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus - debug thisThread $ "push notification for" : + brokendebug thisThread $ "push notification for" : map (fromUUID . Remote.uuid ) rs pullone rs =<< runThreadState st (inRepo Git.Branch.current) where diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 671a620b4..a15c0e152 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -25,12 +25,12 @@ thisThread = "Pusher" {- This thread retries pushes that failed before. -} pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread -pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do +pushRetryThread st dstatus pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) unless (null topush) $ do - debug thisThread + brokendebug thisThread [ "retrying" , show (length topush) , "failed pushes" @@ -44,7 +44,7 @@ pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds hal {- This thread pushes git commits out to remotes soon after they are made. -} pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread -pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do +pushThread st dstatus commitchan pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan @@ -58,7 +58,7 @@ pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Secon void $ alertWhile dstatus (pushAlert remotes) $ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do - debug thisThread + brokendebug thisThread [ "delaying push of" , show (length commits) , "commits" diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index 912270090..6379eee46 100644 --- a/Assistant/Threads/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -11,60 +11,56 @@ module Assistant.Threads.SanityChecker ( import Assistant.Common import Assistant.DaemonStatus -import Assistant.ThreadedMonad -import Assistant.Changes import Assistant.Alert -import Assistant.TransferQueue import qualified Git.LsFiles import Utility.ThreadScheduler import qualified Assistant.Threads.Watcher as Watcher import Data.Time.Clock.POSIX -thisThread :: ThreadName -thisThread = "SanityChecker" - {- This thread wakes up occasionally to make sure the tree is in good shape. -} -sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread -sanityCheckerThread st dstatus transferqueue changechan = thread $ forever $ do - waitForNextCheck dstatus +sanityCheckerThread :: NamedThread +sanityCheckerThread = NamedThread "SanityChecker" $ forever $ do + waitForNextCheck - debug thisThread ["starting sanity check"] + debug ["starting sanity check"] - void $ alertWhile dstatus sanityCheckAlert go + dstatus <- getAssistant daemonStatusHandle + void $ alertWhile dstatus sanityCheckAlert <~> go - debug thisThread ["sanity check complete"] - where - thread = NamedThread thisThread - go = do - modifyDaemonStatus_ dstatus $ \s -> s - { sanityCheckRunning = True } + debug ["sanity check complete"] + where + go = do + dstatus <- getAssistant daemonStatusHandle + liftIO $ modifyDaemonStatus_ dstatus $ \s -> s + { sanityCheckRunning = True } + + now <- liftIO $ getPOSIXTime -- before check started + r <- either showerr return =<< tryIO <~> check - now <- getPOSIXTime -- before check started - r <- catchIO (check st dstatus transferqueue changechan) - $ \e -> do - runThreadState st $ warning $ show e - return False + liftIO $ modifyDaemonStatus_ dstatus $ \s -> s + { sanityCheckRunning = False + , lastSanityCheck = Just now + } - modifyDaemonStatus_ dstatus $ \s -> s - { sanityCheckRunning = False - , lastSanityCheck = Just now - } + return r - return r + showerr e = do + liftAnnex $ warning $ show e + return False {- Only run one check per day, from the time of the last check. -} -waitForNextCheck :: DaemonStatusHandle -> IO () -waitForNextCheck dstatus = do - v <- lastSanityCheck <$> getDaemonStatus dstatus - now <- getPOSIXTime - threadDelaySeconds $ Seconds $ calcdelay now v - where - calcdelay _ Nothing = oneDay - calcdelay now (Just lastcheck) - | lastcheck < now = max oneDay $ - oneDay - truncate (now - lastcheck) - | otherwise = oneDay +waitForNextCheck :: Assistant () +waitForNextCheck = do + v <- lastSanityCheck <$> daemonStatus + now <- liftIO getPOSIXTime + liftIO $ threadDelaySeconds $ Seconds $ calcdelay now v + where + calcdelay _ Nothing = oneDay + calcdelay now (Just lastcheck) + | lastcheck < now = max oneDay $ + oneDay - truncate (now - lastcheck) + | otherwise = oneDay oneDay :: Int oneDay = 24 * 60 * 60 @@ -72,29 +68,31 @@ oneDay = 24 * 60 * 60 {- It's important to stay out of the Annex monad as much as possible while - running potentially expensive parts of this check, since remaining in it - will block the watcher. -} -check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO Bool -check st dstatus transferqueue changechan = do - g <- runThreadState st gitRepo +check :: Assistant Bool +check = do + g <- liftAnnex gitRepo -- Find old unstaged symlinks, and add them to git. - (unstaged, cleanup) <- Git.LsFiles.notInRepo False ["."] g - now <- getPOSIXTime + (unstaged, cleanup) <- liftIO $ Git.LsFiles.notInRepo False ["."] g + now <- liftIO $ getPOSIXTime forM_ unstaged $ \file -> do - ms <- catchMaybeIO $ getSymbolicLinkStatus file + ms <- liftIO $ catchMaybeIO $ getSymbolicLinkStatus file case ms of Just s | toonew (statusChangeTime s) now -> noop - | isSymbolicLink s -> - addsymlink file ms + | isSymbolicLink s -> addsymlink file ms _ -> noop - void cleanup + liftIO $ void cleanup return True - where - toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime) - slop = fromIntegral tenMinutes - insanity msg = do - runThreadState st $ warning msg - void $ addAlert dstatus $ sanityCheckFixAlert msg - addsymlink file s = do - Watcher.runHandler thisThread st dstatus - transferqueue changechan - Watcher.onAddSymlink file s - insanity $ "found unstaged symlink: " ++ file + where + toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime) + slop = fromIntegral tenMinutes + insanity msg = do + liftAnnex $ warning msg + dstatus <- getAssistant daemonStatusHandle + liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg + addsymlink file s = do + d <- getAssistant id + liftIO $ Watcher.runHandler (threadName d) + (threadState d) (daemonStatusHandle d) + (transferQueue d) (changeChan d) + Watcher.onAddSymlink file s + insanity $ "found unstaged symlink: " ++ file diff --git a/Assistant/Threads/TransferPoller.hs b/Assistant/Threads/TransferPoller.hs index afead63ec..6f54336bb 100644 --- a/Assistant/Threads/TransferPoller.hs +++ b/Assistant/Threads/TransferPoller.hs @@ -8,7 +8,6 @@ module Assistant.Threads.TransferPoller where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Logs.Transfer import Utility.NotificationBroadcaster @@ -17,46 +16,42 @@ import qualified Assistant.Threads.TransferWatcher as TransferWatcher import Control.Concurrent import qualified Data.Map as M -thisThread :: ThreadName -thisThread = "TransferPoller" - {- This thread polls the status of ongoing transfers, determining how much - of each transfer is complete. -} -transferPollerThread :: ThreadState -> DaemonStatusHandle -> NamedThread -transferPollerThread st dstatus = thread $ do - g <- runThreadState st gitRepo - tn <- newNotificationHandle =<< - transferNotifier <$> getDaemonStatus dstatus +transferPollerThread :: NamedThread +transferPollerThread = NamedThread "TransferPoller" $ do + g <- liftAnnex gitRepo + tn <- liftIO . newNotificationHandle =<< + transferNotifier <$> daemonStatus forever $ do - threadDelay 500000 -- 0.5 seconds - ts <- currentTransfers <$> getDaemonStatus dstatus + liftIO $ threadDelay 500000 -- 0.5 seconds + ts <- currentTransfers <$> daemonStatus if M.null ts - then waitNotification tn -- block until transfers running + -- block until transfers running + then liftIO $ waitNotification tn else mapM_ (poll g) $ M.toList ts - where - thread = NamedThread thisThread - poll g (t, info) - {- Downloads are polled by checking the size of the - - temp file being used for the transfer. -} - | transferDirection t == Download = do - let f = gitAnnexTmpLocation (transferKey t) g - sz <- catchMaybeIO $ - fromIntegral . fileSize - <$> getFileStatus f - newsize t info sz - {- Uploads don't need to be polled for when the - - TransferWatcher thread can track file - - modifications. -} - | TransferWatcher.watchesTransferSize = noop - {- Otherwise, this code polls the upload progress - - by reading the transfer info file. -} - | otherwise = do - let f = transferFile t g - mi <- catchDefaultIO Nothing $ - readTransferInfoFile Nothing f - maybe noop (newsize t info . bytesComplete) mi - newsize t info sz - | bytesComplete info /= sz && isJust sz = - alterTransferInfo dstatus t $ - \i -> i { bytesComplete = sz } - | otherwise = noop + where + poll g (t, info) + {- Downloads are polled by checking the size of the + - temp file being used for the transfer. -} + | transferDirection t == Download = do + let f = gitAnnexTmpLocation (transferKey t) g + sz <- liftIO $ catchMaybeIO $ + fromIntegral . fileSize <$> getFileStatus f + newsize t info sz + {- Uploads don't need to be polled for when the TransferWatcher + - thread can track file modifications. -} + | TransferWatcher.watchesTransferSize = noop + {- Otherwise, this code polls the upload progress + - by reading the transfer info file. -} + | otherwise = do + let f = transferFile t g + mi <- liftIO $ catchDefaultIO Nothing $ + readTransferInfoFile Nothing f + maybe noop (newsize t info . bytesComplete) mi + + newsize t info sz + | bytesComplete info /= sz && isJust sz = + alterTransferInfo t (\i -> i { bytesComplete = sz }) + <<~ daemonStatusHandle + | otherwise = noop diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index 631c36b02..28df518aa 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -34,7 +34,7 @@ thisThread = "TransferScanner" - that need to be made, to keep data in sync. -} transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread -transferScannerThread st dstatus scanremotes transferqueue = thread $ do +transferScannerThread st dstatus scanremotes transferqueue = thread $ liftIO $ do startupScan go S.empty where @@ -100,7 +100,7 @@ failedTransferScan st dstatus transferqueue r = do -} expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO () expensiveScan st dstatus transferqueue rs = unless onlyweb $ do - liftIO $ debug thisThread ["starting scan of", show visiblers] + brokendebug thisThread ["starting scan of", show visiblers] void $ alertWhile dstatus (scanAlert visiblers) $ do g <- runThreadState st gitRepo (files, cleanup) <- LsFiles.inRepo [] g @@ -110,13 +110,13 @@ expensiveScan st dstatus transferqueue rs = unless onlyweb $ do mapM_ (enqueue f) ts void cleanup return True - liftIO $ debug thisThread ["finished scan of", show visiblers] + brokendebug thisThread ["finished scan of", show visiblers] where onlyweb = all (== webUUID) $ map Remote.uuid rs visiblers = let rs' = filter (not . Remote.readonly) rs in if null rs' then rs else rs' enqueue f (r, t) = do - debug thisThread ["queuing", show t] + brokendebug thisThread ["queuing", show t] queueTransferWhenSmall transferqueue dstatus (Just f) t r findtransfers f (key, _) = do locs <- loggedLocations key diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 168ff2688..bb195e519 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -26,7 +26,7 @@ thisThread = "TransferWatcher" {- This thread watches for changes to the gitAnnexTransferDir, - and updates the DaemonStatus's map of ongoing transfers. -} transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread -transferWatcherThread st dstatus transferqueue = thread $ do +transferWatcherThread st dstatus transferqueue = thread $ liftIO $ do g <- runThreadState st gitRepo let dir = gitAnnexTransferDir g createDirectoryIfMissing True dir @@ -38,7 +38,7 @@ transferWatcherThread st dstatus transferqueue = thread $ do , errHook = hook onErr } void $ watchDir dir (const False) hooks id - debug thisThread ["watching for transfers"] + brokendebug thisThread ["watching for transfers"] where thread = NamedThread thisThread @@ -66,7 +66,7 @@ onAdd st dstatus _ file _ = case parseTransferFile file of where go _ Nothing = noop -- transfer already finished go t (Just info) = do - debug thisThread + brokendebug thisThread [ "transfer starting:" , show t ] @@ -87,8 +87,9 @@ onModify _ dstatus _ file _ = do Just t -> go t =<< readTransferInfoFile Nothing file where go _ Nothing = noop - go t (Just newinfo) = alterTransferInfo dstatus t $ \info -> - info { bytesComplete = bytesComplete newinfo } + go t (Just newinfo) = alterTransferInfo t + (\i -> i { bytesComplete = bytesComplete newinfo }) + dstatus {- This thread can only watch transfer sizes when the DirWatcher supports - tracking modificatons to files. -} @@ -100,7 +101,7 @@ onDel :: Handler onDel st dstatus transferqueue file _ = case parseTransferFile file of Nothing -> noop Just t -> do - debug thisThread + brokendebug thisThread [ "transfer finishing:" , show t ] diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index 30d736073..2e880bef9 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -32,7 +32,7 @@ maxTransfers = 1 {- Dispatches transfers from the queue. -} transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> CommitChan -> NamedThread -transfererThread st dstatus transferqueue slots commitchan = thread $ go =<< readProgramFile +transfererThread st dstatus transferqueue slots commitchan = thread $ liftIO $ go =<< readProgramFile where thread = NamedThread thisThread go program = forever $ inTransferSlot dstatus slots $ @@ -47,11 +47,11 @@ startTransfer :: ThreadState -> DaemonStatusHandle -> CommitChan -> FilePath -> startTransfer st dstatus commitchan program t info = case (transferRemote info, associatedFile info) of (Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info) ( do - debug thisThread [ "Transferring:" , show t ] + brokendebug thisThread [ "Transferring:" , show t ] notifyTransfer dstatus return $ Just (t, info, transferprocess remote file) , do - debug thisThread [ "Skipping unnecessary transfer:" , show t ] + brokendebug thisThread [ "Skipping unnecessary transfer:" , show t ] void $ removeTransfer dstatus t return Nothing ) diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 5d24fe23f..7ab124b14 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -56,9 +56,9 @@ needLsof = error $ unlines ] watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread -watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do +watchThread st dstatus transferqueue changechan = NamedThread thisThread $ liftIO $ do void $ watchDir "." ignored hooks startup - debug thisThread [ "watching", "."] + brokendebug thisThread [ "watching", "."] where startup = startupScan st dstatus hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a @@ -132,7 +132,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l checkcontent key s ensurestaged link s , do - liftIO $ debug threadname ["fix symlink", file] + liftIO $ brokendebug threadname ["fix symlink", file] liftIO $ removeFile file liftIO $ createSymbolicLink link file checkcontent key =<< liftIO (getDaemonStatus dstatus) @@ -162,7 +162,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l {- For speed, tries to reuse the existing blob for symlink target. -} addlink link = do - liftIO $ debug threadname ["add symlink", file] + liftIO $ brokendebug threadname ["add symlink", file] v <- catObjectDetails $ Ref $ ':':file case v of Just (currlink, sha) @@ -187,7 +187,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l onDel :: Handler onDel threadname file _ _dstatus _ = do - liftIO $ debug threadname ["file deleted", file] + liftIO $ brokendebug threadname ["file deleted", file] Annex.Queue.addUpdateIndex =<< inRepo (Git.UpdateIndex.unstageFile file) madeChange file RmChange @@ -201,7 +201,7 @@ onDel threadname file _ _dstatus _ = do - just as good. -} onDelDir :: Handler onDelDir threadname dir _ _dstatus _ = do - liftIO $ debug threadname ["directory deleted", dir] + liftIO $ brokendebug threadname ["directory deleted", dir] Annex.Queue.addCommand "rm" [Params "--quiet -r --cached --ignore-unmatch --"] [dir] madeChange dir RmDirChange diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs index bb8fcd186..02911bab9 100644 --- a/Assistant/Threads/WebApp.hs +++ b/Assistant/Threads/WebApp.hs @@ -52,7 +52,7 @@ webAppThread -> Maybe (IO String) -> Maybe (Url -> FilePath -> IO ()) -> NamedThread -webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ do +webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ liftIO $ do webapp <- WebApp <$> pure assistantdata <*> (pack <$> genRandomToken) @@ -83,7 +83,7 @@ webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ (relHome =<< absPath =<< runThreadState (threadState assistantdata) (fromRepo repoPath)) go port webapp htmlshim urlfile = do - debug thisThread ["running on port", show port] + brokendebug thisThread ["running on port", show port] let url = myUrl webapp port maybe noop (`writeFile` url) urlfile writeHtmlShim url htmlshim |