diff options
author | Joey Hess <joey@kitenet.net> | 2012-10-29 02:21:04 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-10-29 02:21:04 -0400 |
commit | 579f63b6b756ca51b8f9fe53c3e668500718d91f (patch) | |
tree | 20039581df67e034ef434749d37de41e9802d21d | |
parent | 040f68d628120e112e22bfb7100f9650dec940c8 (diff) |
Assistant monad, stage 2.5
Converted several threads to run in the monad.
Added a lot of useful combinators for working with the monad.
Now the monad includes the name of the thread.
Some debugging messages are disabled pending converting other threads.
-rw-r--r-- | Assistant.hs | 14 | ||||
-rw-r--r-- | Assistant/Common.hs | 44 | ||||
-rw-r--r-- | Assistant/DaemonStatus.hs | 4 | ||||
-rw-r--r-- | Assistant/Monad.hs | 40 | ||||
-rw-r--r-- | Assistant/Sync.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/Committer.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/ConfigMonitor.hs | 4 | ||||
-rw-r--r-- | Assistant/Threads/DaemonStatus.hs | 27 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 8 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 132 | ||||
-rw-r--r-- | Assistant/Threads/PairListener.hs | 2 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 12 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 8 | ||||
-rw-r--r-- | Assistant/Threads/SanityChecker.hs | 114 | ||||
-rw-r--r-- | Assistant/Threads/TransferPoller.hs | 73 | ||||
-rw-r--r-- | Assistant/Threads/TransferScanner.hs | 8 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 13 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 12 | ||||
-rw-r--r-- | Assistant/Threads/WebApp.hs | 4 |
21 files changed, 280 insertions, 263 deletions
diff --git a/Assistant.hs b/Assistant.hs index bdca20fef..8a1be3130 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -179,7 +179,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do go = do d <- getAssistant id st <- getAssistant threadState - dstatus <- getAssistant daemonStatus + dstatus <- getAssistant daemonStatusHandle changechan <- getAssistant changeChan commitchan <- getAssistant commitChan pushmap <- getAssistant failedPushMap @@ -189,7 +189,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do branchhandle <- getAssistant branchChangeHandle pushnotifier <- getAssistant pushNotifier #ifdef WITH_WEBAPP - urlrenderer <- liftIO $ newUrlRenderer + urlrenderer <- liftIO newUrlRenderer #endif mapM_ (startthread d) [ watch $ commitThread st changechan commitchan transferqueue dstatus @@ -203,13 +203,13 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do , assist $ pushRetryThread st dstatus pushmap pushnotifier , assist $ mergeThread st dstatus transferqueue branchhandle , assist $ transferWatcherThread st dstatus transferqueue - , assist $ transferPollerThread st dstatus + , assist $ transferPollerThread , assist $ transfererThread st dstatus transferqueue transferslots commitchan - , assist $ daemonStatusThread st dstatus - , assist $ sanityCheckerThread st dstatus transferqueue changechan + , assist $ daemonStatusThread + , assist $ sanityCheckerThread , assist $ mountWatcherThread st dstatus scanremotes pushnotifier - , assist $ netWatcherThread st dstatus scanremotes pushnotifier - , assist $ netWatcherFallbackThread st dstatus scanremotes pushnotifier + , assist $ netWatcherThread + , assist $ netWatcherFallbackThread , assist $ transferScannerThread st dstatus scanremotes transferqueue , assist $ configMonitorThread st dstatus branchhandle commitchan #ifdef WITH_XMPP diff --git a/Assistant/Common.hs b/Assistant/Common.hs index a6c6b8935..b46d3342a 100644 --- a/Assistant/Common.hs +++ b/Assistant/Common.hs @@ -10,7 +10,8 @@ module Assistant.Common ( ThreadName, NamedThread(..), runNamedThread, - debug + debug, + brokendebug ) where import Common.Annex as X @@ -22,25 +23,28 @@ import System.Log.Logger import qualified Control.Exception as E type ThreadName = String -data NamedThread = NamedThread ThreadName (IO ()) +data NamedThread = NamedThread ThreadName (Assistant ()) -debug :: ThreadName -> [String] -> IO () -debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws +brokendebug :: ThreadName -> [String] -> IO () +brokendebug _ _ = noop -- TODO remove this + +debug :: [String] -> Assistant () +debug ws = do + name <- getAssistant threadName + liftIO $ debugM name $ unwords $ (name ++ ":") : ws runNamedThread :: NamedThread -> Assistant () -runNamedThread (NamedThread name a) = liftIO . go =<< getAssistant daemonStatus - where - go dstatus = do - r <- E.try a :: IO (Either E.SomeException ()) - case r of - Right _ -> noop - Left e -> do - let msg = unwords - [ name - , "crashed:" - , show e - ] - hPutStrLn stderr msg - -- TODO click to restart - void $ addAlert dstatus $ - warningAlert name msg +runNamedThread (NamedThread name a) = do + d <- getAssistant id + liftIO . go $ d { threadName = name } + where + go d = do + r <- E.try (runAssistant a d) :: IO (Either E.SomeException ()) + case r of + Right _ -> noop + Left e -> do + let msg = unwords [name, "crashed:", show e] + hPutStrLn stderr msg + -- TODO click to restart + void $ addAlert (daemonStatusHandle d) $ + warningAlert name msg diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index 60b560b90..08cdbaf55 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -181,8 +181,8 @@ adjustTransfersSTM dstatus a = do putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) } {- Alters a transfer's info, if the transfer is in the map. -} -alterTransferInfo :: DaemonStatusHandle -> Transfer -> (TransferInfo -> TransferInfo) -> IO () -alterTransferInfo dstatus t a = updateTransferInfo' dstatus $ M.adjust a t +alterTransferInfo :: Transfer -> (TransferInfo -> TransferInfo) -> DaemonStatusHandle -> IO () +alterTransferInfo t a dstatus = updateTransferInfo' dstatus $ M.adjust a t {- Updates a transfer's info. Adds the transfer to the map if necessary, - or if already present, updates it while preserving the old transferTid, diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index fa982b45e..ef9e7a4cb 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -13,7 +13,12 @@ module Assistant.Monad ( newAssistantData, runAssistant, getAssistant, - liftAnnex + liftAnnex, + (<~>), + (<<~), + daemonStatus, + asIO, + asIO2, ) where import "mtl" Control.Monad.Reader @@ -43,8 +48,9 @@ instance MonadBase IO Assistant where liftBase = Assistant . liftBase data AssistantData = AssistantData - { threadState :: ThreadState - , daemonStatus :: DaemonStatusHandle + { threadName :: String + , threadState :: ThreadState + , daemonStatusHandle :: DaemonStatusHandle , scanRemoteMap :: ScanRemoteMap , transferQueue :: TransferQueue , transferSlots :: TransferSlots @@ -57,7 +63,8 @@ data AssistantData = AssistantData newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData newAssistantData st dstatus = AssistantData - <$> pure st + <$> pure "main" + <*> pure st <*> pure dstatus <*> newScanRemoteMap <*> newTransferQueue @@ -81,3 +88,28 @@ liftAnnex :: Annex a -> Assistant a liftAnnex a = do st <- reader threadState liftIO $ runThreadState st a + +{- Runs an IO action, passing it an IO action that runs an Assistant action. -} +(<~>) :: (IO a -> IO b) -> Assistant a -> Assistant b +io <~> a = do + d <- reader id + liftIO $ io $ runAssistant a d + +{- Creates an IO action that will run an Assistant action when run. -} +asIO :: (a -> Assistant b) -> Assistant (a -> IO b) +asIO a = do + d <- reader id + return $ \v -> runAssistant (a v) d + +{- Creates an IO action that will run an Assistant action when run. -} +asIO2 :: (a -> b -> Assistant c) -> Assistant (a -> b -> IO c) +asIO2 a = do + d <- reader id + return $ \v1 v2 -> runAssistant (a v1 v2) d + +{- Runs an IO action on a selected field of the AssistantData. -} +(<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b +io <<~ v = reader v >>= liftIO . io + +daemonStatus :: Assistant DaemonStatus +daemonStatus = getDaemonStatus <<~ daemonStatusHandle diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 8a9cf8985..bd23c7bb4 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -93,7 +93,7 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do where go _ Nothing _ _ _ = return True -- no branch, so nothing to do go shouldretry (Just branch) g u rs = do - debug threadname + brokendebug threadname [ "pushing to" , show rs ] @@ -117,12 +117,12 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do makemap l = M.fromList $ zip l (repeat now) retry branch g u rs = do - debug threadname [ "trying manual pull to resolve failed pushes" ] + brokendebug threadname [ "trying manual pull to resolve failed pushes" ] void $ manualPull st (Just branch) rs go False (Just branch) g u rs fallback branch g u rs = do - debug threadname + brokendebug threadname [ "fallback pushing to" , show rs ] diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index ceb885100..11c6dc584 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -42,7 +42,7 @@ thisThread = "Committer" {- This thread makes git commits at appropriate times. -} commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread -commitThread st changechan commitchan transferqueue dstatus = thread $ do +commitThread st changechan commitchan transferqueue dstatus = thread $ liftIO $ do delayadd <- runThreadState st $ maybe delayaddDefault (Just . Seconds) . readish <$> getConfig (annexConfig "delayadd") "" @@ -58,7 +58,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes if shouldCommit time readychanges then do - debug thisThread + brokendebug thisThread [ "committing" , show (length readychanges) , "changes" @@ -72,7 +72,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do thread = NamedThread thisThread refill [] = noop refill cs = do - debug thisThread + brokendebug thisThread [ "delaying commit of" , show (length cs) , "changes" diff --git a/Assistant/Threads/ConfigMonitor.hs b/Assistant/Threads/ConfigMonitor.hs index 7450d5c6b..2d5df48dd 100644 --- a/Assistant/Threads/ConfigMonitor.hs +++ b/Assistant/Threads/ConfigMonitor.hs @@ -38,7 +38,7 @@ thisThread = "ConfigMonitor" - be detected immediately. -} configMonitorThread :: ThreadState -> DaemonStatusHandle -> BranchChangeHandle -> CommitChan -> NamedThread -configMonitorThread st dstatus branchhandle commitchan = thread $ do +configMonitorThread st dstatus branchhandle commitchan = thread $ liftIO $ do r <- runThreadState st Annex.gitRepo go r =<< getConfigs r where @@ -50,7 +50,7 @@ configMonitorThread st dstatus branchhandle commitchan = thread $ do new <- getConfigs r when (old /= new) $ do let changedconfigs = new `S.difference` old - debug thisThread $ "reloading config" : + brokendebug thisThread $ "reloading config" : map fst (S.toList changedconfigs) reloadConfigs st dstatus changedconfigs {- Record a commit to get this config diff --git a/Assistant/Threads/DaemonStatus.hs b/Assistant/Threads/DaemonStatus.hs index f3174c86f..946bf1b05 100644 --- a/Assistant/Threads/DaemonStatus.hs +++ b/Assistant/Threads/DaemonStatus.hs @@ -9,28 +9,21 @@ module Assistant.Threads.DaemonStatus where import Assistant.Common import Assistant.DaemonStatus -import Assistant.ThreadedMonad import Utility.ThreadScheduler import Utility.NotificationBroadcaster -thisThread :: ThreadName -thisThread = "DaemonStatus" - {- This writes the daemon status to disk, when it changes, but no more - frequently than once every ten minutes. -} -daemonStatusThread :: ThreadState -> DaemonStatusHandle -> NamedThread -daemonStatusThread st dstatus = thread $ do - notifier <- newNotificationHandle - =<< changeNotifier <$> getDaemonStatus dstatus +daemonStatusThread :: NamedThread +daemonStatusThread = NamedThread "DaemonStatus" $ do + notifier <- liftIO . newNotificationHandle + =<< changeNotifier <$> daemonStatus checkpoint - runEvery (Seconds tenMinutes) $ do - waitNotification notifier + runEvery (Seconds tenMinutes) <~> do + liftIO $ waitNotification notifier checkpoint - where - thread = NamedThread thisThread - checkpoint = do - status <- getDaemonStatus dstatus - file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile - writeDaemonStatusFile file status - + where + checkpoint = do + file <- liftAnnex $ fromRepo gitAnnexDaemonStatusFile + liftIO . writeDaemonStatusFile file =<< daemonStatus diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index e415a7562..152b40361 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -25,7 +25,7 @@ thisThread = "Merger" {- This thread watches for changes to .git/refs/, and handles incoming - pushes. -} mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> NamedThread -mergeThread st dstatus transferqueue branchchange = thread $ do +mergeThread st dstatus transferqueue branchchange = thread $ liftIO $ do g <- runThreadState st gitRepo let dir = Git.localGitDir g </> "refs" createDirectoryIfMissing True dir @@ -35,7 +35,7 @@ mergeThread st dstatus transferqueue branchchange = thread $ do , errHook = hook onErr } void $ watchDir dir (const False) hooks id - debug thisThread ["watching", dir] + brokendebug thisThread ["watching", dir] where thread = NamedThread thisThread @@ -81,7 +81,7 @@ onAdd st dstatus transferqueue branchchange file _ changedbranch = fileToBranch file mergecurrent (Just current) | equivBranches changedbranch current = do - liftIO $ debug thisThread + liftIO $ brokendebug thisThread [ "merging" , show changedbranch , "into" diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index afd1c223c..f36bb8874 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -40,7 +40,7 @@ thisThread :: ThreadName thisThread = "MountWatcher" mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread -mountWatcherThread st handle scanremotes pushnotifier = thread $ +mountWatcherThread st handle scanremotes pushnotifier = thread $ liftIO $ #if WITH_DBUS dbusThread st handle scanremotes pushnotifier #else @@ -93,7 +93,7 @@ checkMountMonitor client = do case running of [] -> startOneService client startableservices (service:_) -> do - debug thisThread [ "Using running DBUS service" + brokendebug thisThread [ "Using running DBUS service" , service , "to monitor mount events." ] @@ -111,7 +111,7 @@ startOneService client (x:xs) = do [toVariant x, toVariant (0 :: Word32)] ifM (elem x <$> listServiceNames client) ( do - debug thisThread [ "Started DBUS service" + brokendebug thisThread [ "Started DBUS service" , x , "to monitor mount events." ] @@ -160,7 +160,7 @@ handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () handleMount st dstatus scanremotes pushnotifier dir = do - debug thisThread ["detected mount of", dir] + brokendebug thisThread ["detected mount of", dir] reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder st dstatus dir diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index ed64541c3..2af880e02 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -11,9 +11,6 @@ module Assistant.Threads.NetWatcher where import Assistant.Common -import Assistant.ThreadedMonad -import Assistant.DaemonStatus -import Assistant.ScanRemotes import Assistant.Sync import Assistant.Pushes import Utility.ThreadScheduler @@ -29,72 +26,67 @@ import Data.Word (Word32) #warning Building without dbus support; will poll for network connection changes #endif -thisThread :: ThreadName -thisThread = "NetWatcher" - -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +netWatcherThread :: NamedThread #if WITH_DBUS -netWatcherThread st dstatus scanremotes pushnotifier = thread $ - dbusThread st dstatus scanremotes pushnotifier +netWatcherThread = thread dbusThread #else -netWatcherThread _ _ _ _ = thread noop +netWatcherThread = thread noop #endif - where - thread = NamedThread thisThread + where + thread = NamedThread "NetWatcher" {- This is a fallback for when dbus cannot be used to detect - network connection changes, but it also ensures that - any networked remotes that may have not been routable for a - while (despite the local network staying up), are synced with - periodically. -} -netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread -netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $ - runEvery (Seconds 3600) $ - handleConnection st dstatus scanremotes pushnotifier - where - thread = NamedThread thisThread +netWatcherFallbackThread :: NamedThread +netWatcherFallbackThread = NamedThread "NetWatcherFallback" $ + runEvery (Seconds 3600) <~> handleConnection #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -dbusThread st dstatus scanremotes pushnotifier = - persistentClient getSystemAddress () onerr go - where - go client = ifM (checkNetMonitor client) - ( do - listenNMConnections client handleconn - listenWicdConnections client handleconn - , do - runThreadState st $ - warning "No known network monitor available through dbus; falling back to polling" - ) - handleconn = do - debug thisThread ["detected network connection"] - notifyRestart pushnotifier - handleConnection st dstatus scanremotes pushnotifier - onerr e _ = do - runThreadState st $ - warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" - {- Wait, in hope that dbus will come back -} - threadDelaySeconds (Seconds 60) +dbusThread :: Assistant () +dbusThread = do + handleerr <- asIO2 onerr + runclient <- asIO go + liftIO $ persistentClient getSystemAddress () handleerr runclient + where + go client = ifM (checkNetMonitor client) + ( do + listenNMConnections client <~> handleconn + listenWicdConnections client <~> handleconn + , do + liftAnnex $ + warning "No known network monitor available through dbus; falling back to polling" + ) + handleconn = do + debug ["detected network connection"] + notifyRestart <<~ pushNotifier + handleConnection + onerr e _ = do + liftAnnex $ + warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" + {- Wait, in hope that dbus will come back -} + liftIO $ threadDelaySeconds (Seconds 60) {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor network connections. -} -checkNetMonitor :: Client -> IO Bool +checkNetMonitor :: Client -> Assistant Bool checkNetMonitor client = do - running <- filter (`elem` [networkmanager, wicd]) + running <- liftIO $ filter (`elem` [networkmanager, wicd]) <$> listServiceNames client case running of [] -> return False (service:_) -> do - debug thisThread [ "Using running DBUS service" + debug [ "Using running DBUS service" , service , "to monitor network connection events." ] return True - where - networkmanager = "org.freedesktop.NetworkManager" - wicd = "org.wicd.daemon" + where + networkmanager = "org.freedesktop.NetworkManager" + wicd = "org.wicd.daemon" {- Listens for new NetworkManager connections. -} listenNMConnections :: Client -> IO () -> IO () @@ -102,18 +94,18 @@ listenNMConnections client callback = listen client matcher $ \event -> when (Just True == anyM activeconnection (signalBody event)) $ callback - where - matcher = matchAny - { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active" - , matchMember = Just "PropertiesChanged" - } - nm_connection_activated = toVariant (2 :: Word32) - nm_state_key = toVariant ("State" :: String) - activeconnection v = do - m <- fromVariant v - vstate <- lookup nm_state_key $ dictionaryItems m - state <- fromVariant vstate - return $ state == nm_connection_activated + where + matcher = matchAny + { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active" + , matchMember = Just "PropertiesChanged" + } + nm_connection_activated = toVariant (2 :: Word32) + nm_state_key = toVariant ("State" :: String) + activeconnection v = do + m <- fromVariant v + vstate <- lookup nm_state_key $ dictionaryItems m + state <- fromVariant vstate + return $ state == nm_connection_activated {- Listens for new Wicd connections. -} listenWicdConnections :: Client -> IO () -> IO () @@ -121,21 +113,23 @@ listenWicdConnections client callback = listen client matcher $ \event -> when (any (== wicd_success) (signalBody event)) $ callback - where - matcher = matchAny - { matchInterface = Just "org.wicd.daemon" - , matchMember = Just "ConnectResultsSent" - } - wicd_success = toVariant ("success" :: String) + where + matcher = matchAny + { matchInterface = Just "org.wicd.daemon" + , matchMember = Just "ConnectResultsSent" + } + wicd_success = toVariant ("success" :: String) #endif -handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -handleConnection st dstatus scanremotes pushnotifier = - reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) - =<< networkRemotes st +handleConnection :: Assistant () +handleConnection = do + d <- getAssistant id + liftIO . reconnectRemotes (threadName d) (threadState d) + (daemonStatusHandle d) (scanRemoteMap d) (Just $ pushNotifier d) + =<< networkRemotes {- Finds network remotes. -} -networkRemotes :: ThreadState -> IO [Remote] -networkRemotes st = runThreadState st $ +networkRemotes :: Assistant [Remote] +networkRemotes = liftAnnex $ filter (isNothing . Remote.localpath) <$> remoteList diff --git a/Assistant/Threads/PairListener.hs b/Assistant/Threads/PairListener.hs index 9875dcb8a..116cc0fa1 100644 --- a/Assistant/Threads/PairListener.hs +++ b/Assistant/Threads/PairListener.hs @@ -28,7 +28,7 @@ thisThread :: ThreadName thisThread = "PairListener" pairListenerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> UrlRenderer -> NamedThread -pairListenerThread st dstatus scanremotes urlrenderer = thread $ withSocketsDo $ +pairListenerThread st dstatus scanremotes urlrenderer = thread $ liftIO $ withSocketsDo $ runEvery (Seconds 1) $ void $ tryIO $ do sock <- getsock go sock [] [] diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs index dc7099e3d..591c8b18b 100644 --- a/Assistant/Threads/PushNotifier.hs +++ b/Assistant/Threads/PushNotifier.hs @@ -35,7 +35,7 @@ controllerThread pushnotifier a = forever $ do killThread tid pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread -pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ +pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $ controllerThread pushnotifier $ do v <- runThreadState st $ getXMPPCreds case v of @@ -45,7 +45,7 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ loop c starttime = do void $ connectXMPP c $ \jid -> do fulljid <- bindJID jid - liftIO $ debug thisThread ["XMPP connected", show fulljid] + liftIO $ brokendebug thisThread ["XMPP connected", show fulljid] putStanza $ gitAnnexPresence gitAnnexSignature s <- getSession _ <- liftIO $ forkIO $ void $ runXMPP s $ @@ -54,10 +54,10 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ now <- getCurrentTime if diffUTCTime now starttime > 300 then do - debug thisThread ["XMPP connection lost; reconnecting"] + brokendebug thisThread ["XMPP connection lost; reconnecting"] loop c now else do - debug thisThread ["XMPP connection failed; will retry"] + brokendebug thisThread ["XMPP connection failed; will retry"] threadDelaySeconds (Seconds 300) loop c =<< getCurrentTime @@ -67,7 +67,7 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ receivenotifications = forever $ do s <- getStanza - liftIO $ debug thisThread ["received XMPP:", show s] + liftIO $ brokendebug thisThread ["received XMPP:", show s] case s of ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> liftIO $ pull st dstatus $ @@ -93,7 +93,7 @@ pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO () pull _ _ [] = noop pull st dstatus us = do rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus - debug thisThread $ "push notification for" : + brokendebug thisThread $ "push notification for" : map (fromUUID . Remote.uuid ) rs pullone rs =<< runThreadState st (inRepo Git.Branch.current) where diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 671a620b4..a15c0e152 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -25,12 +25,12 @@ thisThread = "Pusher" {- This thread retries pushes that failed before. -} pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread -pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do +pushRetryThread st dstatus pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) unless (null topush) $ do - debug thisThread + brokendebug thisThread [ "retrying" , show (length topush) , "failed pushes" @@ -44,7 +44,7 @@ pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds hal {- This thread pushes git commits out to remotes soon after they are made. -} pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread -pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do +pushThread st dstatus commitchan pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan @@ -58,7 +58,7 @@ pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Secon void $ alertWhile dstatus (pushAlert remotes) $ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do - debug thisThread + brokendebug thisThread [ "delaying push of" , show (length commits) , "commits" diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index 912270090..6379eee46 100644 --- a/Assistant/Threads/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -11,60 +11,56 @@ module Assistant.Threads.SanityChecker ( import Assistant.Common import Assistant.DaemonStatus -import Assistant.ThreadedMonad -import Assistant.Changes import Assistant.Alert -import Assistant.TransferQueue import qualified Git.LsFiles import Utility.ThreadScheduler import qualified Assistant.Threads.Watcher as Watcher import Data.Time.Clock.POSIX -thisThread :: ThreadName -thisThread = "SanityChecker" - {- This thread wakes up occasionally to make sure the tree is in good shape. -} -sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread -sanityCheckerThread st dstatus transferqueue changechan = thread $ forever $ do - waitForNextCheck dstatus +sanityCheckerThread :: NamedThread +sanityCheckerThread = NamedThread "SanityChecker" $ forever $ do + waitForNextCheck - debug thisThread ["starting sanity check"] + debug ["starting sanity check"] - void $ alertWhile dstatus sanityCheckAlert go + dstatus <- getAssistant daemonStatusHandle + void $ alertWhile dstatus sanityCheckAlert <~> go - debug thisThread ["sanity check complete"] - where - thread = NamedThread thisThread - go = do - modifyDaemonStatus_ dstatus $ \s -> s - { sanityCheckRunning = True } + debug ["sanity check complete"] + where + go = do + dstatus <- getAssistant daemonStatusHandle + liftIO $ modifyDaemonStatus_ dstatus $ \s -> s + { sanityCheckRunning = True } + + now <- liftIO $ getPOSIXTime -- before check started + r <- either showerr return =<< tryIO <~> check - now <- getPOSIXTime -- before check started - r <- catchIO (check st dstatus transferqueue changechan) - $ \e -> do - runThreadState st $ warning $ show e - return False + liftIO $ modifyDaemonStatus_ dstatus $ \s -> s + { sanityCheckRunning = False + , lastSanityCheck = Just now + } - modifyDaemonStatus_ dstatus $ \s -> s - { sanityCheckRunning = False - , lastSanityCheck = Just now - } + return r - return r + showerr e = do + liftAnnex $ warning $ show e + return False {- Only run one check per day, from the time of the last check. -} -waitForNextCheck :: DaemonStatusHandle -> IO () -waitForNextCheck dstatus = do - v <- lastSanityCheck <$> getDaemonStatus dstatus - now <- getPOSIXTime - threadDelaySeconds $ Seconds $ calcdelay now v - where - calcdelay _ Nothing = oneDay - calcdelay now (Just lastcheck) - | lastcheck < now = max oneDay $ - oneDay - truncate (now - lastcheck) - | otherwise = oneDay +waitForNextCheck :: Assistant () +waitForNextCheck = do + v <- lastSanityCheck <$> daemonStatus + now <- liftIO getPOSIXTime + liftIO $ threadDelaySeconds $ Seconds $ calcdelay now v + where + calcdelay _ Nothing = oneDay + calcdelay now (Just lastcheck) + | lastcheck < now = max oneDay $ + oneDay - truncate (now - lastcheck) + | otherwise = oneDay oneDay :: Int oneDay = 24 * 60 * 60 @@ -72,29 +68,31 @@ oneDay = 24 * 60 * 60 {- It's important to stay out of the Annex monad as much as possible while - running potentially expensive parts of this check, since remaining in it - will block the watcher. -} -check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO Bool -check st dstatus transferqueue changechan = do - g <- runThreadState st gitRepo +check :: Assistant Bool +check = do + g <- liftAnnex gitRepo -- Find old unstaged symlinks, and add them to git. - (unstaged, cleanup) <- Git.LsFiles.notInRepo False ["."] g - now <- getPOSIXTime + (unstaged, cleanup) <- liftIO $ Git.LsFiles.notInRepo False ["."] g + now <- liftIO $ getPOSIXTime forM_ unstaged $ \file -> do - ms <- catchMaybeIO $ getSymbolicLinkStatus file + ms <- liftIO $ catchMaybeIO $ getSymbolicLinkStatus file case ms of Just s | toonew (statusChangeTime s) now -> noop - | isSymbolicLink s -> - addsymlink file ms + | isSymbolicLink s -> addsymlink file ms _ -> noop - void cleanup + liftIO $ void cleanup return True - where - toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime) - slop = fromIntegral tenMinutes - insanity msg = do - runThreadState st $ warning msg - void $ addAlert dstatus $ sanityCheckFixAlert msg - addsymlink file s = do - Watcher.runHandler thisThread st dstatus - transferqueue changechan - Watcher.onAddSymlink file s - insanity $ "found unstaged symlink: " ++ file + where + toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime) + slop = fromIntegral tenMinutes + insanity msg = do + liftAnnex $ warning msg + dstatus <- getAssistant daemonStatusHandle + liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg + addsymlink file s = do + d <- getAssistant id + liftIO $ Watcher.runHandler (threadName d) + (threadState d) (daemonStatusHandle d) + (transferQueue d) (changeChan d) + Watcher.onAddSymlink file s + insanity $ "found unstaged symlink: " ++ file diff --git a/Assistant/Threads/TransferPoller.hs b/Assistant/Threads/TransferPoller.hs index afead63ec..6f54336bb 100644 --- a/Assistant/Threads/TransferPoller.hs +++ b/Assistant/Threads/TransferPoller.hs @@ -8,7 +8,6 @@ module Assistant.Threads.TransferPoller where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Logs.Transfer import Utility.NotificationBroadcaster @@ -17,46 +16,42 @@ import qualified Assistant.Threads.TransferWatcher as TransferWatcher import Control.Concurrent import qualified Data.Map as M -thisThread :: ThreadName -thisThread = "TransferPoller" - {- This thread polls the status of ongoing transfers, determining how much - of each transfer is complete. -} -transferPollerThread :: ThreadState -> DaemonStatusHandle -> NamedThread -transferPollerThread st dstatus = thread $ do - g <- runThreadState st gitRepo - tn <- newNotificationHandle =<< - transferNotifier <$> getDaemonStatus dstatus +transferPollerThread :: NamedThread +transferPollerThread = NamedThread "TransferPoller" $ do + g <- liftAnnex gitRepo + tn <- liftIO . newNotificationHandle =<< + transferNotifier <$> daemonStatus forever $ do - threadDelay 500000 -- 0.5 seconds - ts <- currentTransfers <$> getDaemonStatus dstatus + liftIO $ threadDelay 500000 -- 0.5 seconds + ts <- currentTransfers <$> daemonStatus if M.null ts - then waitNotification tn -- block until transfers running + -- block until transfers running + then liftIO $ waitNotification tn else mapM_ (poll g) $ M.toList ts - where - thread = NamedThread thisThread - poll g (t, info) - {- Downloads are polled by checking the size of the - - temp file being used for the transfer. -} - | transferDirection t == Download = do - let f = gitAnnexTmpLocation (transferKey t) g - sz <- catchMaybeIO $ - fromIntegral . fileSize - <$> getFileStatus f - newsize t info sz - {- Uploads don't need to be polled for when the - - TransferWatcher thread can track file - - modifications. -} - | TransferWatcher.watchesTransferSize = noop - {- Otherwise, this code polls the upload progress - - by reading the transfer info file. -} - | otherwise = do - let f = transferFile t g - mi <- catchDefaultIO Nothing $ - readTransferInfoFile Nothing f - maybe noop (newsize t info . bytesComplete) mi - newsize t info sz - | bytesComplete info /= sz && isJust sz = - alterTransferInfo dstatus t $ - \i -> i { bytesComplete = sz } - | otherwise = noop + where + poll g (t, info) + {- Downloads are polled by checking the size of the + - temp file being used for the transfer. -} + | transferDirection t == Download = do + let f = gitAnnexTmpLocation (transferKey t) g + sz <- liftIO $ catchMaybeIO $ + fromIntegral . fileSize <$> getFileStatus f + newsize t info sz + {- Uploads don't need to be polled for when the TransferWatcher + - thread can track file modifications. -} + | TransferWatcher.watchesTransferSize = noop + {- Otherwise, this code polls the upload progress + - by reading the transfer info file. -} + | otherwise = do + let f = transferFile t g + mi <- liftIO $ catchDefaultIO Nothing $ + readTransferInfoFile Nothing f + maybe noop (newsize t info . bytesComplete) mi + + newsize t info sz + | bytesComplete info /= sz && isJust sz = + alterTransferInfo t (\i -> i { bytesComplete = sz }) + <<~ daemonStatusHandle + | otherwise = noop diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index 631c36b02..28df518aa 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -34,7 +34,7 @@ thisThread = "TransferScanner" - that need to be made, to keep data in sync. -} transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread -transferScannerThread st dstatus scanremotes transferqueue = thread $ do +transferScannerThread st dstatus scanremotes transferqueue = thread $ liftIO $ do startupScan go S.empty where @@ -100,7 +100,7 @@ failedTransferScan st dstatus transferqueue r = do -} expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO () expensiveScan st dstatus transferqueue rs = unless onlyweb $ do - liftIO $ debug thisThread ["starting scan of", show visiblers] + brokendebug thisThread ["starting scan of", show visiblers] void $ alertWhile dstatus (scanAlert visiblers) $ do g <- runThreadState st gitRepo (files, cleanup) <- LsFiles.inRepo [] g @@ -110,13 +110,13 @@ expensiveScan st dstatus transferqueue rs = unless onlyweb $ do mapM_ (enqueue f) ts void cleanup return True - liftIO $ debug thisThread ["finished scan of", show visiblers] + brokendebug thisThread ["finished scan of", show visiblers] where onlyweb = all (== webUUID) $ map Remote.uuid rs visiblers = let rs' = filter (not . Remote.readonly) rs in if null rs' then rs else rs' enqueue f (r, t) = do - debug thisThread ["queuing", show t] + brokendebug thisThread ["queuing", show t] queueTransferWhenSmall transferqueue dstatus (Just f) t r findtransfers f (key, _) = do locs <- loggedLocations key diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 168ff2688..bb195e519 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -26,7 +26,7 @@ thisThread = "TransferWatcher" {- This thread watches for changes to the gitAnnexTransferDir, - and updates the DaemonStatus's map of ongoing transfers. -} transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread -transferWatcherThread st dstatus transferqueue = thread $ do +transferWatcherThread st dstatus transferqueue = thread $ liftIO $ do g <- runThreadState st gitRepo let dir = gitAnnexTransferDir g createDirectoryIfMissing True dir @@ -38,7 +38,7 @@ transferWatcherThread st dstatus transferqueue = thread $ do , errHook = hook onErr } void $ watchDir dir (const False) hooks id - debug thisThread ["watching for transfers"] + brokendebug thisThread ["watching for transfers"] where thread = NamedThread thisThread @@ -66,7 +66,7 @@ onAdd st dstatus _ file _ = case parseTransferFile file of where go _ Nothing = noop -- transfer already finished go t (Just info) = do - debug thisThread + brokendebug thisThread [ "transfer starting:" , show t ] @@ -87,8 +87,9 @@ onModify _ dstatus _ file _ = do Just t -> go t =<< readTransferInfoFile Nothing file where go _ Nothing = noop - go t (Just newinfo) = alterTransferInfo dstatus t $ \info -> - info { bytesComplete = bytesComplete newinfo } + go t (Just newinfo) = alterTransferInfo t + (\i -> i { bytesComplete = bytesComplete newinfo }) + dstatus {- This thread can only watch transfer sizes when the DirWatcher supports - tracking modificatons to files. -} @@ -100,7 +101,7 @@ onDel :: Handler onDel st dstatus transferqueue file _ = case parseTransferFile file of Nothing -> noop Just t -> do - debug thisThread + brokendebug thisThread [ "transfer finishing:" , show t ] diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index 30d736073..2e880bef9 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -32,7 +32,7 @@ maxTransfers = 1 {- Dispatches transfers from the queue. -} transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> CommitChan -> NamedThread -transfererThread st dstatus transferqueue slots commitchan = thread $ go =<< readProgramFile +transfererThread st dstatus transferqueue slots commitchan = thread $ liftIO $ go =<< readProgramFile where thread = NamedThread thisThread go program = forever $ inTransferSlot dstatus slots $ @@ -47,11 +47,11 @@ startTransfer :: ThreadState -> DaemonStatusHandle -> CommitChan -> FilePath -> startTransfer st dstatus commitchan program t info = case (transferRemote info, associatedFile info) of (Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info) ( do - debug thisThread [ "Transferring:" , show t ] + brokendebug thisThread [ "Transferring:" , show t ] notifyTransfer dstatus return $ Just (t, info, transferprocess remote file) , do - debug thisThread [ "Skipping unnecessary transfer:" , show t ] + brokendebug thisThread [ "Skipping unnecessary transfer:" , show t ] void $ removeTransfer dstatus t return Nothing ) diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 5d24fe23f..7ab124b14 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -56,9 +56,9 @@ needLsof = error $ unlines ] watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread -watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do +watchThread st dstatus transferqueue changechan = NamedThread thisThread $ liftIO $ do void $ watchDir "." ignored hooks startup - debug thisThread [ "watching", "."] + brokendebug thisThread [ "watching", "."] where startup = startupScan st dstatus hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a @@ -132,7 +132,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l checkcontent key s ensurestaged link s , do - liftIO $ debug threadname ["fix symlink", file] + liftIO $ brokendebug threadname ["fix symlink", file] liftIO $ removeFile file liftIO $ createSymbolicLink link file checkcontent key =<< liftIO (getDaemonStatus dstatus) @@ -162,7 +162,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l {- For speed, tries to reuse the existing blob for symlink target. -} addlink link = do - liftIO $ debug threadname ["add symlink", file] + liftIO $ brokendebug threadname ["add symlink", file] v <- catObjectDetails $ Ref $ ':':file case v of Just (currlink, sha) @@ -187,7 +187,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l onDel :: Handler onDel threadname file _ _dstatus _ = do - liftIO $ debug threadname ["file deleted", file] + liftIO $ brokendebug threadname ["file deleted", file] Annex.Queue.addUpdateIndex =<< inRepo (Git.UpdateIndex.unstageFile file) madeChange file RmChange @@ -201,7 +201,7 @@ onDel threadname file _ _dstatus _ = do - just as good. -} onDelDir :: Handler onDelDir threadname dir _ _dstatus _ = do - liftIO $ debug threadname ["directory deleted", dir] + liftIO $ brokendebug threadname ["directory deleted", dir] Annex.Queue.addCommand "rm" [Params "--quiet -r --cached --ignore-unmatch --"] [dir] madeChange dir RmDirChange diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs index bb8fcd186..02911bab9 100644 --- a/Assistant/Threads/WebApp.hs +++ b/Assistant/Threads/WebApp.hs @@ -52,7 +52,7 @@ webAppThread -> Maybe (IO String) -> Maybe (Url -> FilePath -> IO ()) -> NamedThread -webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ do +webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ liftIO $ do webapp <- WebApp <$> pure assistantdata <*> (pack <$> genRandomToken) @@ -83,7 +83,7 @@ webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ (relHome =<< absPath =<< runThreadState (threadState assistantdata) (fromRepo repoPath)) go port webapp htmlshim urlfile = do - debug thisThread ["running on port", show port] + brokendebug thisThread ["running on port", show port] let url = myUrl webapp port maybe noop (`writeFile` url) urlfile writeHtmlShim url htmlshim |