diff options
28 files changed, 1110 insertions, 1062 deletions
diff --git a/Assistant.hs b/Assistant.hs index cf92a8625..5b8b236e6 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -120,13 +120,6 @@ module Assistant where import Assistant.Common import Assistant.ThreadedMonad import Assistant.DaemonStatus -import Assistant.Changes -import Assistant.Commits -import Assistant.Pushes -import Assistant.ScanRemotes -import Assistant.BranchChange -import Assistant.TransferQueue -import Assistant.TransferSlots import Assistant.Threads.DaemonStatus import Assistant.Threads.Watcher import Assistant.Threads.Committer @@ -173,58 +166,60 @@ startDaemon assistant foreground webappwaiter logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile pidfile <- fromRepo gitAnnexPidFile go $ Utility.Daemon.daemonize logfd (Just pidfile) False - where - go d = startAssistant assistant d webappwaiter + where + go d = startAssistant assistant d webappwaiter startAssistant :: Bool -> (IO () -> IO ()) -> Maybe (String -> FilePath -> IO ()) -> Annex () startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do checkCanWatch dstatus <- startDaemonStatus - liftIO $ daemonize $ run dstatus st - where - run dstatus st = do - changechan <- newChangeChan - commitchan <- newCommitChan - pushmap <- newFailedPushMap - transferqueue <- newTransferQueue - transferslots <- newTransferSlots - scanremotes <- newScanRemoteMap - branchhandle <- newBranchChangeHandle - pushnotifier <- newPushNotifier + liftIO $ daemonize $ + runAssistant go =<< newAssistantData st dstatus + where + go = do + d <- getAssistant id + st <- getAssistant threadState + dstatus <- getAssistant daemonStatusHandle + commitchan <- getAssistant commitChan + transferqueue <- getAssistant transferQueue + transferslots <- getAssistant transferSlots + scanremotes <- getAssistant scanRemoteMap + pushnotifier <- getAssistant pushNotifier #ifdef WITH_WEBAPP - urlrenderer <- newUrlRenderer + urlrenderer <- liftIO newUrlRenderer #endif - mapM_ (startthread dstatus) - [ watch $ commitThread st changechan commitchan transferqueue dstatus + mapM_ (startthread d) + [ watch $ commitThread #ifdef WITH_WEBAPP - , assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots pushnotifier commitchan urlrenderer Nothing webappwaiter + , assist $ webAppThread d urlrenderer False Nothing webappwaiter #ifdef WITH_PAIRING - , assist $ pairListenerThread st dstatus scanremotes urlrenderer + , assist $ pairListenerThread st dstatus scanremotes urlrenderer #endif #endif - , assist $ pushThread st dstatus commitchan pushmap pushnotifier - , assist $ pushRetryThread st dstatus pushmap pushnotifier - , assist $ mergeThread st dstatus transferqueue branchhandle - , assist $ transferWatcherThread st dstatus transferqueue - , assist $ transferPollerThread st dstatus - , assist $ transfererThread st dstatus transferqueue transferslots commitchan - , assist $ daemonStatusThread st dstatus - , assist $ sanityCheckerThread st dstatus transferqueue changechan - , assist $ mountWatcherThread st dstatus scanremotes pushnotifier - , assist $ netWatcherThread st dstatus scanremotes pushnotifier - , assist $ netWatcherFallbackThread st dstatus scanremotes pushnotifier - , assist $ transferScannerThread st dstatus scanremotes transferqueue - , assist $ configMonitorThread st dstatus branchhandle commitchan + , assist $ pushThread + , assist $ pushRetryThread + , assist $ mergeThread + , assist $ transferWatcherThread + , assist $ transferPollerThread + , assist $ transfererThread st dstatus transferqueue transferslots commitchan + , assist $ daemonStatusThread + , assist $ sanityCheckerThread + , assist $ mountWatcherThread + , assist $ netWatcherThread + , assist $ netWatcherFallbackThread + , assist $ transferScannerThread + , assist $ configMonitorThread #ifdef WITH_XMPP - , assist $ pushNotifierThread st dstatus pushnotifier + , assist $ pushNotifierThread #endif - , watch $ watchThread st dstatus transferqueue changechan - ] - waitForTermination + , watch $ watchThread + ] + liftIO waitForTermination - watch a = (True, a) - assist a = (False, a) - startthread dstatus (watcher, t) - | watcher || assistant = void $ forkIO $ - runNamedThread dstatus t - | otherwise = noop + watch a = (True, a) + assist a = (False, a) + startthread d (watcher, t) + | watcher || assistant = void $ liftIO $ forkIO $ + flip runAssistant d $ + runNamedThread t + | otherwise = noop diff --git a/Assistant/BranchChange.hs b/Assistant/BranchChange.hs index d1d1c20df..cf7080f90 100644 --- a/Assistant/BranchChange.hs +++ b/Assistant/BranchChange.hs @@ -8,7 +8,7 @@ module Assistant.BranchChange where import Control.Concurrent.MSampleVar -import Assistant.Common +import Common.Annex newtype BranchChangeHandle = BranchChangeHandle (MSampleVar ()) diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs index cccc372c1..b20dce09a 100644 --- a/Assistant/Changes.hs +++ b/Assistant/Changes.hs @@ -8,7 +8,6 @@ module Assistant.Changes where import Common.Annex -import qualified Annex.Queue import Types.KeySource import Utility.TSet @@ -39,19 +38,15 @@ newChangeChan :: IO ChangeChan newChangeChan = newTSet {- Handlers call this when they made a change that needs to get committed. -} -madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) -madeChange f t = do - -- Just in case the commit thread is not flushing the queue fast enough. - Annex.Queue.flushWhenFull - liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t) +madeChange :: FilePath -> ChangeType -> IO (Maybe Change) +madeChange f t = Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t) -noChange :: Annex (Maybe Change) +noChange :: IO (Maybe Change) noChange = return Nothing {- Indicates an add needs to be done, but has not started yet. -} -pendingAddChange :: FilePath -> Annex (Maybe Change) -pendingAddChange f = - liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure f) +pendingAddChange :: FilePath -> IO (Maybe Change) +pendingAddChange f = Just <$> (PendingAddChange <$> getCurrentTime <*> pure f) isPendingAddChange :: Change -> Bool isPendingAddChange (PendingAddChange {}) = True diff --git a/Assistant/Common.hs b/Assistant/Common.hs index d6df77f69..b46d3342a 100644 --- a/Assistant/Common.hs +++ b/Assistant/Common.hs @@ -10,36 +10,41 @@ module Assistant.Common ( ThreadName, NamedThread(..), runNamedThread, - debug + debug, + brokendebug ) where import Common.Annex as X -import Assistant.DaemonStatus +import Assistant.Monad as X import Assistant.Alert +import Assistant.DaemonStatus import System.Log.Logger import qualified Control.Exception as E type ThreadName = String -data NamedThread = NamedThread ThreadName (IO ()) +data NamedThread = NamedThread ThreadName (Assistant ()) + +brokendebug :: ThreadName -> [String] -> IO () +brokendebug _ _ = noop -- TODO remove this -debug :: ThreadName -> [String] -> IO () -debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws +debug :: [String] -> Assistant () +debug ws = do + name <- getAssistant threadName + liftIO $ debugM name $ unwords $ (name ++ ":") : ws -runNamedThread :: DaemonStatusHandle -> NamedThread -> IO () -runNamedThread dstatus (NamedThread name a) = go - where - go = do - r <- E.try a :: IO (Either E.SomeException ()) - case r of - Right _ -> noop - Left e -> do - let msg = unwords - [ name - , "crashed:" - , show e - ] - hPutStrLn stderr msg - -- TODO click to restart - void $ addAlert dstatus $ - warningAlert name msg +runNamedThread :: NamedThread -> Assistant () +runNamedThread (NamedThread name a) = do + d <- getAssistant id + liftIO . go $ d { threadName = name } + where + go d = do + r <- E.try (runAssistant a d) :: IO (Either E.SomeException ()) + case r of + Right _ -> noop + Left e -> do + let msg = unwords [name, "crashed:", show e] + hPutStrLn stderr msg + -- TODO click to restart + void $ addAlert (daemonStatusHandle d) $ + warningAlert name msg diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index 60b560b90..08cdbaf55 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -181,8 +181,8 @@ adjustTransfersSTM dstatus a = do putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) } {- Alters a transfer's info, if the transfer is in the map. -} -alterTransferInfo :: DaemonStatusHandle -> Transfer -> (TransferInfo -> TransferInfo) -> IO () -alterTransferInfo dstatus t a = updateTransferInfo' dstatus $ M.adjust a t +alterTransferInfo :: Transfer -> (TransferInfo -> TransferInfo) -> DaemonStatusHandle -> IO () +alterTransferInfo t a dstatus = updateTransferInfo' dstatus $ M.adjust a t {- Updates a transfer's info. Adds the transfer to the map if necessary, - or if already present, updates it while preserving the old transferTid, diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs new file mode 100644 index 000000000..ef9e7a4cb --- /dev/null +++ b/Assistant/Monad.hs @@ -0,0 +1,115 @@ +{- git-annex assistant monad + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE PackageImports, GeneralizedNewtypeDeriving, TypeFamilies, MultiParamTypeClasses #-} + +module Assistant.Monad ( + Assistant, + AssistantData(..), + newAssistantData, + runAssistant, + getAssistant, + liftAnnex, + (<~>), + (<<~), + daemonStatus, + asIO, + asIO2, +) where + +import "mtl" Control.Monad.Reader +import Control.Monad.Base (liftBase, MonadBase) + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.ScanRemotes +import Assistant.TransferQueue +import Assistant.TransferSlots +import Assistant.Pushes +import Assistant.Commits +import Assistant.Changes +import Assistant.BranchChange + +newtype Assistant a = Assistant { mkAssistant :: ReaderT AssistantData IO a } + deriving ( + Monad, + MonadIO, + MonadReader AssistantData, + Functor, + Applicative + ) + +instance MonadBase IO Assistant where + liftBase = Assistant . liftBase + +data AssistantData = AssistantData + { threadName :: String + , threadState :: ThreadState + , daemonStatusHandle :: DaemonStatusHandle + , scanRemoteMap :: ScanRemoteMap + , transferQueue :: TransferQueue + , transferSlots :: TransferSlots + , pushNotifier :: PushNotifier + , failedPushMap :: FailedPushMap + , commitChan :: CommitChan + , changeChan :: ChangeChan + , branchChangeHandle :: BranchChangeHandle + } + +newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData +newAssistantData st dstatus = AssistantData + <$> pure "main" + <*> pure st + <*> pure dstatus + <*> newScanRemoteMap + <*> newTransferQueue + <*> newTransferSlots + <*> newPushNotifier + <*> newFailedPushMap + <*> newCommitChan + <*> newChangeChan + <*> newBranchChangeHandle + +runAssistant :: Assistant a -> AssistantData -> IO a +runAssistant a = runReaderT (mkAssistant a) + +getAssistant :: (AssistantData -> a) -> Assistant a +getAssistant = reader + +{- Runs an action in the git-annex monad. Note that the same monad state + - is shared amoung all assistant threads, so only one of these can run at + - a time. Therefore, long-duration actions should be avoided. -} +liftAnnex :: Annex a -> Assistant a +liftAnnex a = do + st <- reader threadState + liftIO $ runThreadState st a + +{- Runs an IO action, passing it an IO action that runs an Assistant action. -} +(<~>) :: (IO a -> IO b) -> Assistant a -> Assistant b +io <~> a = do + d <- reader id + liftIO $ io $ runAssistant a d + +{- Creates an IO action that will run an Assistant action when run. -} +asIO :: (a -> Assistant b) -> Assistant (a -> IO b) +asIO a = do + d <- reader id + return $ \v -> runAssistant (a v) d + +{- Creates an IO action that will run an Assistant action when run. -} +asIO2 :: (a -> b -> Assistant c) -> Assistant (a -> b -> IO c) +asIO2 a = do + d <- reader id + return $ \v1 v2 -> runAssistant (a v1 v2) d + +{- Runs an IO action on a selected field of the AssistantData. -} +(<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b +io <<~ v = reader v >>= liftIO . io + +daemonStatus :: Assistant DaemonStatus +daemonStatus = getDaemonStatus <<~ daemonStatusHandle diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 8a9cf8985..bd23c7bb4 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -93,7 +93,7 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do where go _ Nothing _ _ _ = return True -- no branch, so nothing to do go shouldretry (Just branch) g u rs = do - debug threadname + brokendebug threadname [ "pushing to" , show rs ] @@ -117,12 +117,12 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do makemap l = M.fromList $ zip l (repeat now) retry branch g u rs = do - debug threadname [ "trying manual pull to resolve failed pushes" ] + brokendebug threadname [ "trying manual pull to resolve failed pushes" ] void $ manualPull st (Just branch) rs go False (Just branch) g u rs fallback branch g u rs = do - debug threadname + brokendebug threadname [ "fallback pushing to" , show rs ] diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index ceb885100..2ab693f05 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -13,7 +13,6 @@ import Assistant.Common import Assistant.Changes import Assistant.Commits import Assistant.Alert -import Assistant.ThreadedMonad import Assistant.Threads.Watcher import Assistant.TransferQueue import Assistant.DaemonStatus @@ -37,48 +36,40 @@ import Data.Tuple.Utils import qualified Data.Set as S import Data.Either -thisThread :: ThreadName -thisThread = "Committer" - {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread -commitThread st changechan commitchan transferqueue dstatus = thread $ do - delayadd <- runThreadState st $ +commitThread :: NamedThread +commitThread = NamedThread "Committer" $ do + delayadd <- liftAnnex $ maybe delayaddDefault (Just . Seconds) . readish <$> getConfig (annexConfig "delayadd") "" - runEvery (Seconds 1) $ do + runEvery (Seconds 1) <~> do -- We already waited one second as a simple rate limiter. -- Next, wait until at least one change is available for -- processing. - changes <- getChanges changechan + changes <- getChanges <<~ changeChan -- Now see if now's a good time to commit. - time <- getCurrentTime + time <- liftIO getCurrentTime if shouldCommit time changes then do - readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes + readychanges <- handleAdds delayadd changes if shouldCommit time readychanges then do - debug thisThread + debug [ "committing" , show (length readychanges) , "changes" ] - void $ alertWhile dstatus commitAlert $ - runThreadState st commitStaged - recordCommit commitchan + dstatus <- getAssistant daemonStatusHandle + void $ alertWhile dstatus commitAlert <~> + liftAnnex commitStaged + recordCommit <<~ commitChan else refill readychanges else refill changes - where - thread = NamedThread thisThread - refill [] = noop - refill cs = do - debug thisThread - [ "delaying commit of" - , show (length cs) - , "changes" - ] - refillChanges changechan cs - + where + refill [] = noop + refill cs = do + debug ["delaying commit of", show (length cs), "changes"] + flip refillChanges cs <<~ changeChan commitStaged :: Annex Bool commitStaged = do @@ -99,12 +90,12 @@ commitStaged = do - each other out, etc. Git returns nonzero on those, - so don't propigate out commit failures. -} return True - where - nomessage ps - | Git.Version.older "1.7.2" = Param "-m" - : Param "autocommit" : ps - | otherwise = Param "--allow-empty-message" - : Param "-m" : Param "" : ps + where + nomessage ps + | Git.Version.older "1.7.2" = Param "-m" + : Param "autocommit" : ps + | otherwise = Param "--allow-empty-message" + : Param "-m" : Param "" : ps {- Decide if now is a good time to make a commit. - Note that the list of change times has an undefined order. @@ -118,9 +109,9 @@ shouldCommit now changes | len > 10000 = True -- avoid bloating queue too much | length (filter thisSecond changes) < 10 = True | otherwise = False -- batch activity - where - len = length changes - thisSecond c = now `diffUTCTime` changeTime c <= 1 + where + len = length changes + thisSecond c = now `diffUTCTime` changeTime c <= 1 {- OSX needs a short delay after a file is added before locking it down, - as pasting a file seems to try to set file permissions or otherwise @@ -152,77 +143,82 @@ delayaddDefault = Nothing - Any pending adds that are not ready yet are put back into the ChangeChan, - where they will be retried later. -} -handleAdds :: Maybe Seconds -> ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change] -handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null incomplete) $ do +handleAdds :: Maybe Seconds -> [Change] -> Assistant [Change] +handleAdds delayadd cs = returnWhen (null incomplete) $ do let (pending, inprocess) = partition isPendingAddChange incomplete pending' <- findnew pending - (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd st pending' inprocess + (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd pending' inprocess unless (null postponed) $ - refillChanges changechan postponed + flip refillChanges postponed <<~ changeChan returnWhen (null toadd) $ do added <- catMaybes <$> forM toadd add if DirWatcher.eventsCoalesce || null added then return $ added ++ otherchanges else do - r <- handleAdds delayadd st changechan transferqueue dstatus - =<< getChanges changechan + r <- handleAdds delayadd + =<< getChanges <<~ changeChan return $ r ++ added ++ otherchanges - where - (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs + where + (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs - findnew [] = return [] - findnew pending@(exemplar:_) = do - (!newfiles, cleanup) <- runThreadState st $ - inRepo (Git.LsFiles.notInRepo False $ map changeFile pending) - void cleanup - -- note: timestamp info is lost here - let ts = changeTime exemplar - return $ map (PendingAddChange ts) newfiles + findnew [] = return [] + findnew pending@(exemplar:_) = do + (!newfiles, cleanup) <- liftAnnex $ + inRepo (Git.LsFiles.notInRepo False $ map changeFile pending) + void $ liftIO cleanup + -- note: timestamp info is lost here + let ts = changeTime exemplar + return $ map (PendingAddChange ts) newfiles + + returnWhen c a + | c = return otherchanges + | otherwise = a - returnWhen c a - | c = return otherchanges - | otherwise = a + add :: Change -> Assistant (Maybe Change) + add change@(InProcessAddChange { keySource = ks }) = do + dstatus <- getAssistant daemonStatusHandle + alertWhile' dstatus (addFileAlert $ keyFilename ks) <~> add' change ks + add _ = return Nothing - add :: Change -> IO (Maybe Change) - add change@(InProcessAddChange { keySource = ks }) = - alertWhile' dstatus (addFileAlert $ keyFilename ks) $ - liftM ret $ catchMaybeIO $ - sanitycheck ks $ runThreadState st $ do - showStart "add" $ keyFilename ks - key <- Command.Add.ingest ks - done (finishedChange change) (keyFilename ks) key - where - {- Add errors tend to be transient and will - - be automatically dealt with, so don't - - pass to the alert code. -} - ret (Just j@(Just _)) = (True, j) - ret _ = (True, Nothing) - add _ = return Nothing + add' change ks = liftM ret $ catchMaybeIO <~> do + sanitycheck ks $ do + key <- liftAnnex $ do + showStart "add" $ keyFilename ks + Command.Add.ingest ks + done (finishedChange change) (keyFilename ks) key + where + {- Add errors tend to be transient and will be automatically + - dealt with, so don't pass to the alert code. -} + ret (Just j@(Just _)) = (True, j) + ret _ = (True, Nothing) - done _ _ Nothing = do - showEndFail - return Nothing - done change file (Just key) = do - link <- Command.Add.link file key True - when DirWatcher.eventsCoalesce $ do + done _ _ Nothing = do + liftAnnex showEndFail + return Nothing + done change file (Just key) = do + link <- liftAnnex $ Command.Add.link file key True + when DirWatcher.eventsCoalesce $ + liftAnnex $ do sha <- inRepo $ Git.HashObject.hashObject BlobObject link stageSymlink file sha - queueTransfers Next transferqueue dstatus key (Just file) Upload - showEndOk - return $ Just change + showEndOk + transferqueue <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftAnnex $ queueTransfers Next transferqueue dstatus key (Just file) Upload + return $ Just change - {- Check that the keysource's keyFilename still exists, - - and is still a hard link to its contentLocation, - - before ingesting it. -} - sanitycheck keysource a = do - fs <- getSymbolicLinkStatus $ keyFilename keysource - ks <- getSymbolicLinkStatus $ contentLocation keysource - if deviceID ks == deviceID fs && fileID ks == fileID fs - then a - else return Nothing + {- Check that the keysource's keyFilename still exists, + - and is still a hard link to its contentLocation, + - before ingesting it. -} + sanitycheck keysource a = do + fs <- liftIO $ getSymbolicLinkStatus $ keyFilename keysource + ks <- liftIO $ getSymbolicLinkStatus $ contentLocation keysource + if deviceID ks == deviceID fs && fileID ks == fileID fs + then a + else return Nothing {- Files can Either be Right to be added now, - or are unsafe, and must be Left for later. @@ -230,11 +226,11 @@ handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null in - Check by running lsof on the temp directory, which - the KeySources are locked down in. -} -safeToAdd :: Maybe Seconds -> ThreadState -> [Change] -> [Change] -> IO [Either Change Change] -safeToAdd _ _ [] [] = return [] -safeToAdd delayadd st pending inprocess = do - maybe noop threadDelaySeconds delayadd - runThreadState st $ do +safeToAdd :: Maybe Seconds -> [Change] -> [Change] -> Assistant [Either Change Change] +safeToAdd _ [] [] = return [] +safeToAdd delayadd pending inprocess = do + maybe noop (liftIO . threadDelaySeconds) delayadd + liftAnnex $ do keysources <- mapM Command.Add.lockDown (map changeFile pending) let inprocess' = map mkinprocess (zip pending keysources) tmpdir <- fromRepo gitAnnexTmpDir @@ -250,25 +246,24 @@ safeToAdd delayadd st pending inprocess = do mapM_ canceladd $ lefts checked allRight $ rights checked else return checked - where - check openfiles change@(InProcessAddChange { keySource = ks }) - | S.member (contentLocation ks) openfiles = Left change - check _ change = Right change + where + check openfiles change@(InProcessAddChange { keySource = ks }) + | S.member (contentLocation ks) openfiles = Left change + check _ change = Right change - mkinprocess (c, ks) = InProcessAddChange - { changeTime = changeTime c - , keySource = ks - } + mkinprocess (c, ks) = InProcessAddChange + { changeTime = changeTime c + , keySource = ks + } - canceladd (InProcessAddChange { keySource = ks }) = do - warning $ keyFilename ks - ++ " still has writers, not adding" - -- remove the hard link - void $ liftIO $ tryIO $ - removeFile $ contentLocation ks - canceladd _ = noop + canceladd (InProcessAddChange { keySource = ks }) = do + warning $ keyFilename ks + ++ " still has writers, not adding" + -- remove the hard link + void $ liftIO $ tryIO $ removeFile $ contentLocation ks + canceladd _ = noop - openwrite (_file, mode, _pid) = - mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite + openwrite (_file, mode, _pid) = + mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite - allRight = return . map Right + allRight = return . map Right diff --git a/Assistant/Threads/ConfigMonitor.hs b/Assistant/Threads/ConfigMonitor.hs index 7450d5c6b..fe98b10e8 100644 --- a/Assistant/Threads/ConfigMonitor.hs +++ b/Assistant/Threads/ConfigMonitor.hs @@ -9,7 +9,6 @@ module Assistant.Threads.ConfigMonitor where import Assistant.Common import Assistant.BranchChange -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Commits import Utility.ThreadScheduler @@ -19,10 +18,8 @@ import Logs.Remote import Logs.PreferredContent import Logs.Group import Remote.List (remoteListRefresh) -import qualified Git import qualified Git.LsTree as LsTree import qualified Annex.Branch -import qualified Annex import qualified Data.Set as S @@ -37,26 +34,22 @@ thisThread = "ConfigMonitor" - if the branch has not changed in a while, configuration changes will - be detected immediately. -} -configMonitorThread :: ThreadState -> DaemonStatusHandle -> BranchChangeHandle -> CommitChan -> NamedThread -configMonitorThread st dstatus branchhandle commitchan = thread $ do - r <- runThreadState st Annex.gitRepo - go r =<< getConfigs r - where - thread = NamedThread thisThread - - go r old = do - threadDelaySeconds (Seconds 60) - waitBranchChange branchhandle - new <- getConfigs r - when (old /= new) $ do - let changedconfigs = new `S.difference` old - debug thisThread $ "reloading config" : - map fst (S.toList changedconfigs) - reloadConfigs st dstatus changedconfigs - {- Record a commit to get this config - - change pushed out to remotes. -} - recordCommit commitchan - go r new +configMonitorThread :: NamedThread +configMonitorThread = NamedThread "ConfigMonitor" $ loop =<< getConfigs + where + loop old = do + liftIO $ threadDelaySeconds (Seconds 60) + waitBranchChange <<~ branchChangeHandle + new <- getConfigs + when (old /= new) $ do + let changedconfigs = new `S.difference` old + debug $ "reloading config" : + map fst (S.toList changedconfigs) + reloadConfigs new + {- Record a commit to get this config + - change pushed out to remotes. -} + recordCommit <<~ commitChan + loop new {- Config files, and their checksums. -} type Configs = S.Set (FilePath, String) @@ -73,22 +66,23 @@ configFilesActions = , (preferredContentLog, noop) ] -reloadConfigs :: ThreadState -> DaemonStatusHandle -> Configs -> IO () -reloadConfigs st dstatus changedconfigs = runThreadState st $ do - sequence_ as - void preferredContentMapLoad +reloadConfigs :: Configs -> Assistant () +reloadConfigs changedconfigs = do + liftAnnex $ do + sequence_ as + void preferredContentMapLoad {- Changes to the remote log, or the trust log, can affect the - syncRemotes list -} - when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $ - updateSyncRemotes dstatus - where - (fs, as) = unzip $ filter (flip S.member changedfiles . fst) - configFilesActions - changedfiles = S.map fst changedconfigs + when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $ + liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle + where + (fs, as) = unzip $ filter (flip S.member changedfiles . fst) + configFilesActions + changedfiles = S.map fst changedconfigs -getConfigs :: Git.Repo -> IO Configs -getConfigs r = S.fromList . map extract - <$> LsTree.lsTreeFiles Annex.Branch.fullname files r - where - files = map fst configFilesActions - extract treeitem = (LsTree.file treeitem, LsTree.sha treeitem) +getConfigs :: Assistant Configs +getConfigs = S.fromList . map extract + <$> liftAnnex (inRepo $ LsTree.lsTreeFiles Annex.Branch.fullname files) + where + files = map fst configFilesActions + extract treeitem = (LsTree.file treeitem, LsTree.sha treeitem) diff --git a/Assistant/Threads/DaemonStatus.hs b/Assistant/Threads/DaemonStatus.hs index f3174c86f..946bf1b05 100644 --- a/Assistant/Threads/DaemonStatus.hs +++ b/Assistant/Threads/DaemonStatus.hs @@ -9,28 +9,21 @@ module Assistant.Threads.DaemonStatus where import Assistant.Common import Assistant.DaemonStatus -import Assistant.ThreadedMonad import Utility.ThreadScheduler import Utility.NotificationBroadcaster -thisThread :: ThreadName -thisThread = "DaemonStatus" - {- This writes the daemon status to disk, when it changes, but no more - frequently than once every ten minutes. -} -daemonStatusThread :: ThreadState -> DaemonStatusHandle -> NamedThread -daemonStatusThread st dstatus = thread $ do - notifier <- newNotificationHandle - =<< changeNotifier <$> getDaemonStatus dstatus +daemonStatusThread :: NamedThread +daemonStatusThread = NamedThread "DaemonStatus" $ do + notifier <- liftIO . newNotificationHandle + =<< changeNotifier <$> daemonStatus checkpoint - runEvery (Seconds tenMinutes) $ do - waitNotification notifier + runEvery (Seconds tenMinutes) <~> do + liftIO $ waitNotification notifier checkpoint - where - thread = NamedThread thisThread - checkpoint = do - status <- getDaemonStatus dstatus - file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile - writeDaemonStatusFile file status - + where + checkpoint = do + file <- liftAnnex $ fromRepo gitAnnexDaemonStatusFile + liftIO . writeDaemonStatusFile file =<< daemonStatus diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index e415a7562..a766c5977 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -8,8 +8,6 @@ module Assistant.Threads.Merger where import Assistant.Common -import Assistant.ThreadedMonad -import Assistant.DaemonStatus import Assistant.TransferQueue import Assistant.BranchChange import Utility.DirWatcher @@ -24,36 +22,34 @@ thisThread = "Merger" {- This thread watches for changes to .git/refs/, and handles incoming - pushes. -} -mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> NamedThread -mergeThread st dstatus transferqueue branchchange = thread $ do - g <- runThreadState st gitRepo +mergeThread :: NamedThread +mergeThread = NamedThread "Merger" $ do + g <- liftAnnex gitRepo let dir = Git.localGitDir g </> "refs" - createDirectoryIfMissing True dir - let hook a = Just $ runHandler st dstatus transferqueue branchchange a + liftIO $ createDirectoryIfMissing True dir + let hook a = Just <$> asIO2 (runHandler a) + addhook <- hook onAdd + errhook <- hook onErr let hooks = mkWatchHooks - { addHook = hook onAdd - , errHook = hook onErr + { addHook = addhook + , errHook = errhook } - void $ watchDir dir (const False) hooks id - debug thisThread ["watching", dir] - where - thread = NamedThread thisThread + void $ liftIO $ watchDir dir (const False) hooks id + debug ["watching", dir] -type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> FilePath -> Maybe FileStatus -> IO () +type Handler = FilePath -> Assistant () {- Runs an action handler. - - Exceptions are ignored, otherwise a whole thread could be crashed. -} -runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus transferqueue branchchange handler file filestatus = void $ - either print (const noop) =<< tryIO go - where - go = handler st dstatus transferqueue branchchange file filestatus +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file _filestatus = + either (liftIO . print) (const noop) =<< tryIO <~> handler file {- Called when there's an error with inotify. -} onErr :: Handler -onErr _ _ _ _ msg _ = error msg +onErr msg = error msg {- Called when a new branch ref is written. - @@ -67,29 +63,29 @@ onErr _ _ _ _ msg _ = error msg - ran are merged in. -} onAdd :: Handler -onAdd st dstatus transferqueue branchchange file _ +onAdd file | ".lock" `isSuffixOf` file = noop | isAnnexBranch file = do - branchChanged branchchange - runThreadState st $ + branchChanged <<~ branchChangeHandle + transferqueue <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftAnnex $ whenM Annex.Branch.forceUpdate $ queueDeferredDownloads Later transferqueue dstatus - | "/synced/" `isInfixOf` file = runThreadState st $ do - mergecurrent =<< inRepo Git.Branch.current + | "/synced/" `isInfixOf` file = do + mergecurrent =<< liftAnnex (inRepo Git.Branch.current) | otherwise = noop - where - changedbranch = fileToBranch file - mergecurrent (Just current) - | equivBranches changedbranch current = do - liftIO $ debug thisThread - [ "merging" - , show changedbranch - , "into" - , show current - ] - void $ inRepo $ - Git.Merge.mergeNonInteractive changedbranch - mergecurrent _ = noop + where + changedbranch = fileToBranch file + mergecurrent (Just current) + | equivBranches changedbranch current = do + debug + [ "merging", show changedbranch + , "into", show current + ] + void $ liftAnnex $ inRepo $ + Git.Merge.mergeNonInteractive changedbranch + mergecurrent _ = noop equivBranches :: Git.Ref -> Git.Ref -> Bool equivBranches x y = base x == base y diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index afd1c223c..cb08071f5 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -11,11 +11,8 @@ module Assistant.Threads.MountWatcher where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus -import Assistant.ScanRemotes import Assistant.Sync -import Assistant.Pushes import qualified Annex import qualified Git import Utility.ThreadScheduler @@ -39,70 +36,70 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread -mountWatcherThread st handle scanremotes pushnotifier = thread $ +mountWatcherThread :: NamedThread +mountWatcherThread = NamedThread "MountWatcher" $ #if WITH_DBUS - dbusThread st handle scanremotes pushnotifier + dbusThread #else - pollingThread st handle scanremotes pushnotifier + pollingThread #endif - where - thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -dbusThread st dstatus scanremotes pushnotifier = - E.catch (runClient getSessionAddress go) onerr - where - go client = ifM (checkMountMonitor client) - ( do - {- Store the current mount points in an mvar, - - to be compared later. We could in theory - - work out the mount point from the dbus - - message, but this is easier. -} - mvar <- newMVar =<< currentMountPoints - forM_ mountChanged $ \matcher -> - listen client matcher $ \_event -> do - nowmounted <- currentMountPoints - wasmounted <- swapMVar mvar nowmounted - handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted - , do - runThreadState st $ - warning "No known volume monitor available through dbus; falling back to mtab polling" - pollinstead - ) - onerr :: E.SomeException -> IO () - onerr e = do - {- If the session dbus fails, the user probably - - logged out of their desktop. Even if they log - - back in, we won't have access to the dbus - - session key, so polling is the best that can be - - done in this situation. -} - runThreadState st $ - warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")" - pollinstead - pollinstead = pollingThread st dstatus scanremotes pushnotifier +dbusThread :: Assistant () +dbusThread = do + runclient <- asIO go + r <- liftIO $ E.try $ runClient getSessionAddress runclient + either onerr (const noop) r + where + go client = ifM (checkMountMonitor client) + ( do + {- Store the current mount points in an MVar, to be + - compared later. We could in theory work out the + - mount point from the dbus message, but this is + - easier. -} + mvar <- liftIO $ newMVar =<< currentMountPoints + handleevent <- asIO $ \_event -> do + nowmounted <- liftIO $ currentMountPoints + wasmounted <- liftIO $ swapMVar mvar nowmounted + handleMounts wasmounted nowmounted + liftIO $ forM_ mountChanged $ \matcher -> + listen client matcher handleevent + , do + liftAnnex $ + warning "No known volume monitor available through dbus; falling back to mtab polling" + pollingThread + ) + onerr :: E.SomeException -> Assistant () + onerr e = do + {- If the session dbus fails, the user probably + - logged out of their desktop. Even if they log + - back in, we won't have access to the dbus + - session key, so polling is the best that can be + - done in this situation. -} + liftAnnex $ + warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")" + pollingThread {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor mounts. If not, will attempt to start one. -} -checkMountMonitor :: Client -> IO Bool +checkMountMonitor :: Client -> Assistant Bool checkMountMonitor client = do running <- filter (`elem` usableservices) - <$> listServiceNames client + <$> liftIO (listServiceNames client) case running of - [] -> startOneService client startableservices + [] -> liftIO $ startOneService client startableservices (service:_) -> do - debug thisThread [ "Using running DBUS service" + debug [ "Using running DBUS service" , service , "to monitor mount events." ] return True - where - startableservices = [gvfs] - usableservices = startableservices ++ [kde] - gvfs = "org.gtk.Private.GduVolumeMonitor" - kde = "org.kde.DeviceNotifications" + where + startableservices = [gvfs] + usableservices = startableservices ++ [kde] + gvfs = "org.gtk.Private.GduVolumeMonitor" + kde = "org.kde.DeviceNotifications" startOneService :: Client -> [ServiceName] -> IO Bool startOneService _ [] = return False @@ -111,7 +108,7 @@ startOneService client (x:xs) = do [toVariant x, toVariant (0 :: Word32)] ifM (elem x <$> listServiceNames client) ( do - debug thisThread [ "Started DBUS service" + brokendebug thisThread [ "Started DBUS service" , x , "to monitor mount events." ] @@ -144,26 +141,29 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback] #endif -pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints +pollingThread :: Assistant () +pollingThread = go =<< liftIO currentMountPoints where go wasmounted = do - threadDelaySeconds (Seconds 10) - nowmounted <- currentMountPoints - handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted + liftIO $ threadDelaySeconds (Seconds 10) + nowmounted <- liftIO currentMountPoints + handleMounts wasmounted nowmounted go nowmounted -handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO () -handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = - mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $ +handleMounts :: MountPoints -> MountPoints -> Assistant () +handleMounts wasmounted nowmounted = + mapM_ (handleMount . mnt_dir) $ S.toList $ newMountPoints wasmounted nowmounted -handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () -handleMount st dstatus scanremotes pushnotifier dir = do - debug thisThread ["detected mount of", dir] - reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) - =<< filter (Git.repoIsLocal . Remote.repo) - <$> remotesUnder st dstatus dir +handleMount :: FilePath -> Assistant () +handleMount dir = do + debug ["detected mount of", dir] + rs <- filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder dir + d <- getAssistant id + liftIO $ + reconnectRemotes (threadName d) (threadState d) + (daemonStatusHandle d) (scanRemoteMap d) + (Just $ pushNotifier d) rs {- Finds remotes located underneath the mount point. - @@ -173,15 +173,15 @@ handleMount st dstatus scanremotes pushnotifier dir = do - at startup time, or may have changed (it could even be a different - repository at the same remote location..) -} -remotesUnder :: ThreadState -> DaemonStatusHandle -> FilePath -> IO [Remote] -remotesUnder st dstatus dir = runThreadState st $ do - repotop <- fromRepo Git.repoPath - rs <- remoteList - pairs <- mapM (checkremote repotop) rs +remotesUnder :: FilePath -> Assistant [Remote] +remotesUnder dir = do + repotop <- liftAnnex $ fromRepo Git.repoPath + rs <- liftAnnex remoteList + pairs <- liftAnnex $ mapM (checkremote repotop) rs let (waschanged, rs') = unzip pairs when (any id waschanged) $ do - Annex.changeState $ \s -> s { Annex.remotes = rs' } - updateSyncRemotes dstatus + liftAnnex $ Annex.changeState $ \s -> s { Annex.remotes = rs' } + liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle return $ map snd $ filter fst pairs where checkremote repotop r = case Remote.localpath r of diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index ed64541c3..2af880e02 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -11,9 +11,6 @@ module Assistant.Threads.NetWatcher where import Assistant.Common -import Assistant.ThreadedMonad -import Assistant.DaemonStatus -import Assistant.ScanRemotes import Assistant.Sync import Assistant.Pushes import Utility.ThreadScheduler @@ -29,72 +26,67 @@ import Data.Word (Word32) #warning Building without dbus support; will poll for network connection changes #endif -thisThread :: ThreadName -thisThread = "NetWatcher" - -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +netWatcherThread :: NamedThread #if WITH_DBUS -netWatcherThread st dstatus scanremotes pushnotifier = thread $ - dbusThread st dstatus scanremotes pushnotifier +netWatcherThread = thread dbusThread #else -netWatcherThread _ _ _ _ = thread noop +netWatcherThread = thread noop #endif - where - thread = NamedThread thisThread + where + thread = NamedThread "NetWatcher" {- This is a fallback for when dbus cannot be used to detect - network connection changes, but it also ensures that - any networked remotes that may have not been routable for a - while (despite the local network staying up), are synced with - periodically. -} -netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread -netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $ - runEvery (Seconds 3600) $ - handleConnection st dstatus scanremotes pushnotifier - where - thread = NamedThread thisThread +netWatcherFallbackThread :: NamedThread +netWatcherFallbackThread = NamedThread "NetWatcherFallback" $ + runEvery (Seconds 3600) <~> handleConnection #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -dbusThread st dstatus scanremotes pushnotifier = - persistentClient getSystemAddress () onerr go - where - go client = ifM (checkNetMonitor client) - ( do - listenNMConnections client handleconn - listenWicdConnections client handleconn - , do - runThreadState st $ - warning "No known network monitor available through dbus; falling back to polling" - ) - handleconn = do - debug thisThread ["detected network connection"] - notifyRestart pushnotifier - handleConnection st dstatus scanremotes pushnotifier - onerr e _ = do - runThreadState st $ - warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" - {- Wait, in hope that dbus will come back -} - threadDelaySeconds (Seconds 60) +dbusThread :: Assistant () +dbusThread = do + handleerr <- asIO2 onerr + runclient <- asIO go + liftIO $ persistentClient getSystemAddress () handleerr runclient + where + go client = ifM (checkNetMonitor client) + ( do + listenNMConnections client <~> handleconn + listenWicdConnections client <~> handleconn + , do + liftAnnex $ + warning "No known network monitor available through dbus; falling back to polling" + ) + handleconn = do + debug ["detected network connection"] + notifyRestart <<~ pushNotifier + handleConnection + onerr e _ = do + liftAnnex $ + warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" + {- Wait, in hope that dbus will come back -} + liftIO $ threadDelaySeconds (Seconds 60) {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor network connections. -} -checkNetMonitor :: Client -> IO Bool +checkNetMonitor :: Client -> Assistant Bool checkNetMonitor client = do - running <- filter (`elem` [networkmanager, wicd]) + running <- liftIO $ filter (`elem` [networkmanager, wicd]) <$> listServiceNames client case running of [] -> return False (service:_) -> do - debug thisThread [ "Using running DBUS service" + debug [ "Using running DBUS service" , service , "to monitor network connection events." ] return True - where - networkmanager = "org.freedesktop.NetworkManager" - wicd = "org.wicd.daemon" + where + networkmanager = "org.freedesktop.NetworkManager" + wicd = "org.wicd.daemon" {- Listens for new NetworkManager connections. -} listenNMConnections :: Client -> IO () -> IO () @@ -102,18 +94,18 @@ listenNMConnections client callback = listen client matcher $ \event -> when (Just True == anyM activeconnection (signalBody event)) $ callback - where - matcher = matchAny - { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active" - , matchMember = Just "PropertiesChanged" - } - nm_connection_activated = toVariant (2 :: Word32) - nm_state_key = toVariant ("State" :: String) - activeconnection v = do - m <- fromVariant v - vstate <- lookup nm_state_key $ dictionaryItems m - state <- fromVariant vstate - return $ state == nm_connection_activated + where + matcher = matchAny + { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active" + , matchMember = Just "PropertiesChanged" + } + nm_connection_activated = toVariant (2 :: Word32) + nm_state_key = toVariant ("State" :: String) + activeconnection v = do + m <- fromVariant v + vstate <- lookup nm_state_key $ dictionaryItems m + state <- fromVariant vstate + return $ state == nm_connection_activated {- Listens for new Wicd connections. -} listenWicdConnections :: Client -> IO () -> IO () @@ -121,21 +113,23 @@ listenWicdConnections client callback = listen client matcher $ \event -> when (any (== wicd_success) (signalBody event)) $ callback - where - matcher = matchAny - { matchInterface = Just "org.wicd.daemon" - , matchMember = Just "ConnectResultsSent" - } - wicd_success = toVariant ("success" :: String) + where + matcher = matchAny + { matchInterface = Just "org.wicd.daemon" + , matchMember = Just "ConnectResultsSent" + } + wicd_success = toVariant ("success" :: String) #endif -handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -handleConnection st dstatus scanremotes pushnotifier = - reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) - =<< networkRemotes st +handleConnection :: Assistant () +handleConnection = do + d <- getAssistant id + liftIO . reconnectRemotes (threadName d) (threadState d) + (daemonStatusHandle d) (scanRemoteMap d) (Just $ pushNotifier d) + =<< networkRemotes {- Finds network remotes. -} -networkRemotes :: ThreadState -> IO [Remote] -networkRemotes st = runThreadState st $ +networkRemotes :: Assistant [Remote] +networkRemotes = liftAnnex $ filter (isNothing . Remote.localpath) <$> remoteList diff --git a/Assistant/Threads/PairListener.hs b/Assistant/Threads/PairListener.hs index 9875dcb8a..116cc0fa1 100644 --- a/Assistant/Threads/PairListener.hs +++ b/Assistant/Threads/PairListener.hs @@ -28,7 +28,7 @@ thisThread :: ThreadName thisThread = "PairListener" pairListenerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> UrlRenderer -> NamedThread -pairListenerThread st dstatus scanremotes urlrenderer = thread $ withSocketsDo $ +pairListenerThread st dstatus scanremotes urlrenderer = thread $ liftIO $ withSocketsDo $ runEvery (Seconds 1) $ void $ tryIO $ do sock <- getsock go sock [] [] diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs index dc7099e3d..d19369b8d 100644 --- a/Assistant/Threads/PushNotifier.hs +++ b/Assistant/Threads/PushNotifier.hs @@ -12,7 +12,6 @@ module Assistant.Threads.PushNotifier where import Assistant.Common import Assistant.XMPP -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Pushes import Assistant.Sync @@ -25,56 +24,56 @@ import qualified Data.Set as S import qualified Git.Branch import Data.Time.Clock -thisThread :: ThreadName -thisThread = "PushNotifier" +pushNotifierThread :: NamedThread +pushNotifierThread = NamedThread "PushNotifier" $ do + iodebug <- asIO debug + iopull <- asIO pull + pn <- getAssistant pushNotifier + controllerThread pn <~> xmppClient pn iodebug iopull controllerThread :: PushNotifier -> IO () -> IO () -controllerThread pushnotifier a = forever $ do - tid <- forkIO a +controllerThread pushnotifier xmppclient = forever $ do + tid <- forkIO xmppclient waitRestart pushnotifier killThread tid -pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread -pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ - controllerThread pushnotifier $ do - v <- runThreadState st $ getXMPPCreds - case v of - Nothing -> noop - Just c -> loop c =<< getCurrentTime - where - loop c starttime = do - void $ connectXMPP c $ \jid -> do - fulljid <- bindJID jid - liftIO $ debug thisThread ["XMPP connected", show fulljid] - putStanza $ gitAnnexPresence gitAnnexSignature - s <- getSession - _ <- liftIO $ forkIO $ void $ runXMPP s $ - receivenotifications - sendnotifications - now <- getCurrentTime - if diffUTCTime now starttime > 300 - then do - debug thisThread ["XMPP connection lost; reconnecting"] - loop c now - else do - debug thisThread ["XMPP connection failed; will retry"] - threadDelaySeconds (Seconds 300) - loop c =<< getCurrentTime - - sendnotifications = forever $ do - us <- liftIO $ waitPush pushnotifier - putStanza $ gitAnnexPresence $ encodePushNotification us - - receivenotifications = forever $ do - s <- getStanza - liftIO $ debug thisThread ["received XMPP:", show s] - case s of - ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> - liftIO $ pull st dstatus $ - concat $ catMaybes $ - map decodePushNotification $ - presencePayloads p - _ -> noop +xmppClient :: PushNotifier -> ([String] -> IO ()) -> ([UUID] -> IO ()) -> Assistant () +xmppClient pushnotifier iodebug iopull = do + v <- liftAnnex getXMPPCreds + case v of + Nothing -> noop + Just c -> liftIO $ loop c =<< getCurrentTime + where + loop c starttime = do + void $ connectXMPP c $ \jid -> do + fulljid <- bindJID jid + liftIO $ iodebug ["XMPP connected", show fulljid] + putStanza $ gitAnnexPresence gitAnnexSignature + s <- getSession + _ <- liftIO $ forkIO $ void $ runXMPP s $ + receivenotifications + sendnotifications + now <- getCurrentTime + if diffUTCTime now starttime > 300 + then do + iodebug ["XMPP connection lost; reconnecting"] + loop c now + else do + iodebug ["XMPP connection failed; will retry"] + threadDelaySeconds (Seconds 300) + loop c =<< getCurrentTime + sendnotifications = forever $ do + us <- liftIO $ waitPush pushnotifier + putStanza $ gitAnnexPresence $ encodePushNotification us + receivenotifications = forever $ do + s <- getStanza + liftIO $ iodebug ["received XMPP:", show s] + case s of + ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> + liftIO $ iopull $ concat $ catMaybes $ + map decodePushNotification $ + presencePayloads p + _ -> noop {- We only pull from one remote out of the set listed in the push - notification, as an optimisation. @@ -89,18 +88,18 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ - fully up-to-date. If that happens, the pushRetryThread will come along - and retry the push, and we'll get another notification once it succeeds, - and pull again. -} -pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO () -pull _ _ [] = noop -pull st dstatus us = do - rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus - debug thisThread $ "push notification for" : - map (fromUUID . Remote.uuid ) rs - pullone rs =<< runThreadState st (inRepo Git.Branch.current) - where - matching r = Remote.uuid r `S.member` s - s = S.fromList us +pull :: [UUID] -> Assistant () +pull [] = noop +pull us = do + rs <- filter matching . syncRemotes <$> daemonStatus + debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs + st <- getAssistant threadState + liftIO . pullone st rs =<< liftAnnex (inRepo Git.Branch.current) + where + matching r = Remote.uuid r `S.member` s + s = S.fromList us - pullone [] _ = noop - pullone (r:rs) branch = - unlessM (all id . fst <$> manualPull st branch [r]) $ - pullone rs branch + pullone _ [] _ = noop + pullone st (r:rs) branch = + unlessM (all id . fst <$> manualPull st branch [r]) $ + pullone st rs branch diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 671a620b4..811314651 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -11,7 +11,6 @@ import Assistant.Common import Assistant.Commits import Assistant.Pushes import Assistant.Alert -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Sync import Utility.ThreadScheduler @@ -24,52 +23,49 @@ thisThread :: ThreadName thisThread = "Pusher" {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread -pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do +pushRetryThread :: NamedThread +pushRetryThread = NamedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do -- We already waited half an hour, now wait until there are failed -- pushes to retry. - topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) + pushmap <- getAssistant failedPushMap + topush <- liftIO $ getFailedPushesBefore pushmap (fromIntegral halfhour) unless (null topush) $ do - debug thisThread - [ "retrying" - , show (length topush) - , "failed pushes" - ] - now <- getCurrentTime - void $ alertWhile dstatus (pushRetryAlert topush) $ + debug ["retrying", show (length topush), "failed pushes"] + now <- liftIO $ getCurrentTime + st <- getAssistant threadState + pushnotifier <- getAssistant pushNotifier + dstatus <- getAssistant daemonStatusHandle + void $ liftIO $ alertWhile dstatus (pushRetryAlert topush) $ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush where halfhour = 1800 - thread = NamedThread thisThread {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread -pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do +pushThread :: NamedThread +pushThread = NamedThread "Pusher" $ runEvery (Seconds 2) <~> do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made - commits <- getCommits commitchan + commits <- getCommits <<~ commitChan -- Now see if now's a good time to push. if shouldPush commits then do - remotes <- filter pushable . syncRemotes - <$> getDaemonStatus dstatus + remotes <- filter pushable . syncRemotes <$> daemonStatus unless (null remotes) $ do - now <- getCurrentTime - void $ alertWhile dstatus (pushAlert remotes) $ + now <- liftIO $ getCurrentTime + st <- getAssistant threadState + pushmap <- getAssistant failedPushMap + pushnotifier <- getAssistant pushNotifier + dstatus <- getAssistant daemonStatusHandle + void $ liftIO $ alertWhile dstatus (pushAlert remotes) $ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do - debug thisThread - [ "delaying push of" - , show (length commits) - , "commits" - ] - refillCommits commitchan commits - where - thread = NamedThread thisThread - pushable r - | Remote.specialRemote r = False - | Remote.readonly r = False - | otherwise = True + debug ["delaying push of", show (length commits), "commits"] + flip refillCommits commits <<~ commitChan + where + pushable r + | Remote.specialRemote r = False + | Remote.readonly r = False + | otherwise = True {- Decide if now is a good time to push to remotes. - diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index 912270090..d92c6c394 100644 --- a/Assistant/Threads/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -11,60 +11,56 @@ module Assistant.Threads.SanityChecker ( import Assistant.Common import Assistant.DaemonStatus -import Assistant.ThreadedMonad -import Assistant.Changes import Assistant.Alert -import Assistant.TransferQueue import qualified Git.LsFiles import Utility.ThreadScheduler import qualified Assistant.Threads.Watcher as Watcher import Data.Time.Clock.POSIX -thisThread :: ThreadName -thisThread = "SanityChecker" - {- This thread wakes up occasionally to make sure the tree is in good shape. -} -sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread -sanityCheckerThread st dstatus transferqueue changechan = thread $ forever $ do - waitForNextCheck dstatus +sanityCheckerThread :: NamedThread +sanityCheckerThread = NamedThread "SanityChecker" $ forever $ do + waitForNextCheck - debug thisThread ["starting sanity check"] + debug ["starting sanity check"] - void $ alertWhile dstatus sanityCheckAlert go + dstatus <- getAssistant daemonStatusHandle + void $ alertWhile dstatus sanityCheckAlert <~> go - debug thisThread ["sanity check complete"] - where - thread = NamedThread thisThread - go = do - modifyDaemonStatus_ dstatus $ \s -> s - { sanityCheckRunning = True } + debug ["sanity check complete"] + where + go = do + dstatus <- getAssistant daemonStatusHandle + liftIO $ modifyDaemonStatus_ dstatus $ \s -> s + { sanityCheckRunning = True } + + now <- liftIO $ getPOSIXTime -- before check started + r <- either showerr return =<< tryIO <~> check - now <- getPOSIXTime -- before check started - r <- catchIO (check st dstatus transferqueue changechan) - $ \e -> do - runThreadState st $ warning $ show e - return False + liftIO $ modifyDaemonStatus_ dstatus $ \s -> s + { sanityCheckRunning = False + , lastSanityCheck = Just now + } - modifyDaemonStatus_ dstatus $ \s -> s - { sanityCheckRunning = False - , lastSanityCheck = Just now - } + return r - return r + showerr e = do + liftAnnex $ warning $ show e + return False {- Only run one check per day, from the time of the last check. -} -waitForNextCheck :: DaemonStatusHandle -> IO () -waitForNextCheck dstatus = do - v <- lastSanityCheck <$> getDaemonStatus dstatus - now <- getPOSIXTime - threadDelaySeconds $ Seconds $ calcdelay now v - where - calcdelay _ Nothing = oneDay - calcdelay now (Just lastcheck) - | lastcheck < now = max oneDay $ - oneDay - truncate (now - lastcheck) - | otherwise = oneDay +waitForNextCheck :: Assistant () +waitForNextCheck = do + v <- lastSanityCheck <$> daemonStatus + now <- liftIO getPOSIXTime + liftIO $ threadDelaySeconds $ Seconds $ calcdelay now v + where + calcdelay _ Nothing = oneDay + calcdelay now (Just lastcheck) + | lastcheck < now = max oneDay $ + oneDay - truncate (now - lastcheck) + | otherwise = oneDay oneDay :: Int oneDay = 24 * 60 * 60 @@ -72,29 +68,27 @@ oneDay = 24 * 60 * 60 {- It's important to stay out of the Annex monad as much as possible while - running potentially expensive parts of this check, since remaining in it - will block the watcher. -} -check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO Bool -check st dstatus transferqueue changechan = do - g <- runThreadState st gitRepo +check :: Assistant Bool +check = do + g <- liftAnnex gitRepo -- Find old unstaged symlinks, and add them to git. - (unstaged, cleanup) <- Git.LsFiles.notInRepo False ["."] g - now <- getPOSIXTime + (unstaged, cleanup) <- liftIO $ Git.LsFiles.notInRepo False ["."] g + now <- liftIO $ getPOSIXTime forM_ unstaged $ \file -> do - ms <- catchMaybeIO $ getSymbolicLinkStatus file + ms <- liftIO $ catchMaybeIO $ getSymbolicLinkStatus file case ms of Just s | toonew (statusChangeTime s) now -> noop - | isSymbolicLink s -> - addsymlink file ms + | isSymbolicLink s -> addsymlink file ms _ -> noop - void cleanup + liftIO $ void cleanup return True - where - toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime) - slop = fromIntegral tenMinutes - insanity msg = do - runThreadState st $ warning msg - void $ addAlert dstatus $ sanityCheckFixAlert msg - addsymlink file s = do - Watcher.runHandler thisThread st dstatus - transferqueue changechan - Watcher.onAddSymlink file s - insanity $ "found unstaged symlink: " ++ file + where + toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime) + slop = fromIntegral tenMinutes + insanity msg = do + liftAnnex $ warning msg + dstatus <- getAssistant daemonStatusHandle + liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg + addsymlink file s = do + Watcher.runHandler Watcher.onAddSymlink file s + insanity $ "found unstaged symlink: " ++ file diff --git a/Assistant/Threads/TransferPoller.hs b/Assistant/Threads/TransferPoller.hs index afead63ec..6f54336bb 100644 --- a/Assistant/Threads/TransferPoller.hs +++ b/Assistant/Threads/TransferPoller.hs @@ -8,7 +8,6 @@ module Assistant.Threads.TransferPoller where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Logs.Transfer import Utility.NotificationBroadcaster @@ -17,46 +16,42 @@ import qualified Assistant.Threads.TransferWatcher as TransferWatcher import Control.Concurrent import qualified Data.Map as M -thisThread :: ThreadName -thisThread = "TransferPoller" - {- This thread polls the status of ongoing transfers, determining how much - of each transfer is complete. -} -transferPollerThread :: ThreadState -> DaemonStatusHandle -> NamedThread -transferPollerThread st dstatus = thread $ do - g <- runThreadState st gitRepo - tn <- newNotificationHandle =<< - transferNotifier <$> getDaemonStatus dstatus +transferPollerThread :: NamedThread +transferPollerThread = NamedThread "TransferPoller" $ do + g <- liftAnnex gitRepo + tn <- liftIO . newNotificationHandle =<< + transferNotifier <$> daemonStatus forever $ do - threadDelay 500000 -- 0.5 seconds - ts <- currentTransfers <$> getDaemonStatus dstatus + liftIO $ threadDelay 500000 -- 0.5 seconds + ts <- currentTransfers <$> daemonStatus if M.null ts - then waitNotification tn -- block until transfers running + -- block until transfers running + then liftIO $ waitNotification tn else mapM_ (poll g) $ M.toList ts - where - thread = NamedThread thisThread - poll g (t, info) - {- Downloads are polled by checking the size of the - - temp file being used for the transfer. -} - | transferDirection t == Download = do - let f = gitAnnexTmpLocation (transferKey t) g - sz <- catchMaybeIO $ - fromIntegral . fileSize - <$> getFileStatus f - newsize t info sz - {- Uploads don't need to be polled for when the - - TransferWatcher thread can track file - - modifications. -} - | TransferWatcher.watchesTransferSize = noop - {- Otherwise, this code polls the upload progress - - by reading the transfer info file. -} - | otherwise = do - let f = transferFile t g - mi <- catchDefaultIO Nothing $ - readTransferInfoFile Nothing f - maybe noop (newsize t info . bytesComplete) mi - newsize t info sz - | bytesComplete info /= sz && isJust sz = - alterTransferInfo dstatus t $ - \i -> i { bytesComplete = sz } - | otherwise = noop + where + poll g (t, info) + {- Downloads are polled by checking the size of the + - temp file being used for the transfer. -} + | transferDirection t == Download = do + let f = gitAnnexTmpLocation (transferKey t) g + sz <- liftIO $ catchMaybeIO $ + fromIntegral . fileSize <$> getFileStatus f + newsize t info sz + {- Uploads don't need to be polled for when the TransferWatcher + - thread can track file modifications. -} + | TransferWatcher.watchesTransferSize = noop + {- Otherwise, this code polls the upload progress + - by reading the transfer info file. -} + | otherwise = do + let f = transferFile t g + mi <- liftIO $ catchDefaultIO Nothing $ + readTransferInfoFile Nothing f + maybe noop (newsize t info . bytesComplete) mi + + newsize t info sz + | bytesComplete info /= sz && isJust sz = + alterTransferInfo t (\i -> i { bytesComplete = sz }) + <<~ daemonStatusHandle + | otherwise = noop diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index 631c36b02..8c46a79fa 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -10,7 +10,6 @@ module Assistant.Threads.TransferScanner where import Assistant.Common import Assistant.ScanRemotes import Assistant.TransferQueue -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Alert import Assistant.Drop @@ -27,64 +26,64 @@ import Annex.Wanted import qualified Data.Set as S -thisThread :: ThreadName -thisThread = "TransferScanner" - {- This thread waits until a remote needs to be scanned, to find transfers - that need to be made, to keep data in sync. -} -transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread -transferScannerThread st dstatus scanremotes transferqueue = thread $ do +transferScannerThread :: NamedThread +transferScannerThread = NamedThread "TransferScanner" $ do startupScan go S.empty - where - thread = NamedThread thisThread - go scanned = do - threadDelaySeconds (Seconds 2) - (rs, infos) <- unzip <$> getScanRemote scanremotes - if any fullScan infos || any (`S.notMember` scanned) rs - then do - expensiveScan st dstatus transferqueue rs - go $ scanned `S.union` S.fromList rs - else do - mapM_ (failedTransferScan st dstatus transferqueue) rs - go scanned - {- All available remotes are scanned in full on startup, - - for multiple reasons, including: - - - - * This may be the first run, and there may be remotes - - already in place, that need to be synced. - - * We may have run before, and scanned a remote, but - - only been in a subdirectory of the git remote, and so - - not synced it all. - - * We may have run before, and had transfers queued, - - and then the system (or us) crashed, and that info was - - lost. - -} - startupScan = addScanRemotes scanremotes True - =<< syncRemotes <$> getDaemonStatus dstatus + where + go scanned = do + liftIO $ threadDelaySeconds (Seconds 2) + (rs, infos) <- unzip <$> getScanRemote <<~ scanRemoteMap + if any fullScan infos || any (`S.notMember` scanned) rs + then do + expensiveScan rs + go $ scanned `S.union` S.fromList rs + else do + mapM_ failedTransferScan rs + go scanned + {- All available remotes are scanned in full on startup, + - for multiple reasons, including: + - + - * This may be the first run, and there may be remotes + - already in place, that need to be synced. + - * We may have run before, and scanned a remote, but + - only been in a subdirectory of the git remote, and so + - not synced it all. + - * We may have run before, and had transfers queued, + - and then the system (or us) crashed, and that info was + - lost. + -} + startupScan = do + scanremotes <- getAssistant scanRemoteMap + liftIO . addScanRemotes scanremotes True + =<< syncRemotes <$> daemonStatus {- This is a cheap scan for failed transfers involving a remote. -} -failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO () -failedTransferScan st dstatus transferqueue r = do - failed <- runThreadState st $ getFailedTransfers (Remote.uuid r) - runThreadState st $ mapM_ removeFailedTransfer $ map fst failed +failedTransferScan :: Remote -> Assistant () +failedTransferScan r = do + failed <- liftAnnex $ getFailedTransfers (Remote.uuid r) + liftAnnex $ mapM_ removeFailedTransfer $ map fst failed mapM_ retry failed - where - retry (t, info) - | transferDirection t == Download = do - {- Check if the remote still has the key. - - If not, relies on the expensiveScan to - - get it queued from some other remote. -} - whenM (runThreadState st $ remoteHas r $ transferKey t) $ - requeue t info - | otherwise = do - {- The Transferrer checks when uploading - - that the remote doesn't already have the - - key, so it's not redundantly checked - - here. -} + where + retry (t, info) + | transferDirection t == Download = do + {- Check if the remote still has the key. + - If not, relies on the expensiveScan to + - get it queued from some other remote. -} + whenM (liftAnnex $ remoteHas r $ transferKey t) $ requeue t info - requeue t info = queueTransferWhenSmall + | otherwise = do + {- The Transferrer checks when uploading + - that the remote doesn't already have the + - key, so it's not redundantly checked here. -} + requeue t info + requeue t info = do + transferqueue <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ queueTransferWhenSmall transferqueue dstatus (associatedFile info) t r {- This is a expensive scan through the full git work tree, finding @@ -98,42 +97,45 @@ failedTransferScan st dstatus transferqueue r = do - TODO: It would be better to first drop as much as we can, before - transferring much, to minimise disk use. -} -expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO () -expensiveScan st dstatus transferqueue rs = unless onlyweb $ do - liftIO $ debug thisThread ["starting scan of", show visiblers] - void $ alertWhile dstatus (scanAlert visiblers) $ do - g <- runThreadState st gitRepo - (files, cleanup) <- LsFiles.inRepo [] g +expensiveScan :: [Remote] -> Assistant () +expensiveScan rs = unless onlyweb $ do + debug ["starting scan of", show visiblers] + dstatus <- getAssistant daemonStatusHandle + void $ alertWhile dstatus (scanAlert visiblers) <~> do + g <- liftAnnex gitRepo + (files, cleanup) <- liftIO $ LsFiles.inRepo [] g forM_ files $ \f -> do - ts <- runThreadState st $ - ifAnnexed f (findtransfers f) (return []) + ts <- liftAnnex $ + ifAnnexed f (findtransfers dstatus f) (return []) mapM_ (enqueue f) ts - void cleanup + void $ liftIO cleanup return True - liftIO $ debug thisThread ["finished scan of", show visiblers] - where - onlyweb = all (== webUUID) $ map Remote.uuid rs - visiblers = let rs' = filter (not . Remote.readonly) rs - in if null rs' then rs else rs' - enqueue f (r, t) = do - debug thisThread ["queuing", show t] - queueTransferWhenSmall transferqueue dstatus (Just f) t r - findtransfers f (key, _) = do - locs <- loggedLocations key - {- The syncable remotes may have changed since this - - scan began. -} - syncrs <- liftIO $ syncRemotes <$> getDaemonStatus dstatus - present <- inAnnex key + debug ["finished scan of", show visiblers] + where + onlyweb = all (== webUUID) $ map Remote.uuid rs + visiblers = let rs' = filter (not . Remote.readonly) rs + in if null rs' then rs else rs' + enqueue f (r, t) = do + debug ["queuing", show t] + transferqueue <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ queueTransferWhenSmall transferqueue dstatus (Just f) t r + findtransfers dstatus f (key, _) = do + locs <- loggedLocations key + {- The syncable remotes may have changed since this + - scan began. -} + syncrs <- liftIO $ syncRemotes <$> getDaemonStatus dstatus + present <- inAnnex key - handleDrops' locs syncrs present key (Just f) + handleDrops' locs syncrs present key (Just f) - let slocs = S.fromList locs - let use a = return $ catMaybes $ map (a key slocs) syncrs - if present - then filterM (wantSend (Just f) . Remote.uuid . fst) - =<< use (genTransfer Upload False) - else ifM (wantGet $ Just f) - ( use (genTransfer Download True) , return [] ) + let slocs = S.fromList locs + let use a = return $ catMaybes $ map (a key slocs) syncrs + if present + then filterM (wantSend (Just f) . Remote.uuid . fst) + =<< use (genTransfer Upload False) + else ifM (wantGet $ Just f) + ( use (genTransfer Download True) , return [] ) genTransfer :: Direction -> Bool -> Key -> S.Set UUID -> Remote -> Maybe (Remote, Transfer) genTransfer direction want key slocs r diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 168ff2688..ad341b00a 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -8,7 +8,6 @@ module Assistant.Threads.TransferWatcher where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.TransferQueue import Assistant.Drop @@ -20,75 +19,69 @@ import qualified Remote import Control.Concurrent -thisThread :: ThreadName -thisThread = "TransferWatcher" - {- This thread watches for changes to the gitAnnexTransferDir, - and updates the DaemonStatus's map of ongoing transfers. -} -transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread -transferWatcherThread st dstatus transferqueue = thread $ do - g <- runThreadState st gitRepo - let dir = gitAnnexTransferDir g - createDirectoryIfMissing True dir - let hook a = Just $ runHandler st dstatus transferqueue a +transferWatcherThread :: NamedThread +transferWatcherThread = NamedThread "TransferWatcher" $ do + dir <- liftAnnex $ gitAnnexTransferDir <$> gitRepo + liftIO $ createDirectoryIfMissing True dir + let hook a = Just <$> asIO2 (runHandler a) + addhook <- hook onAdd + delhook <- hook onDel + modifyhook <- hook onModify + errhook <- hook onErr let hooks = mkWatchHooks - { addHook = hook onAdd - , delHook = hook onDel - , modifyHook = hook onModify - , errHook = hook onErr + { addHook = addhook + , delHook = delhook + , modifyHook = modifyhook + , errHook = errhook } - void $ watchDir dir (const False) hooks id - debug thisThread ["watching for transfers"] - where - thread = NamedThread thisThread + void $ liftIO $ watchDir dir (const False) hooks id + debug ["watching for transfers"] -type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> FilePath -> Maybe FileStatus -> IO () +type Handler = FilePath -> Assistant () {- Runs an action handler. - - Exceptions are ignored, otherwise a whole thread could be crashed. -} -runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus transferqueue handler file filestatus = void $ - either print (const noop) =<< tryIO go - where - go = handler st dstatus transferqueue file filestatus +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file _filestatus = + either (liftIO . print) (const noop) =<< tryIO <~> handler file {- Called when there's an error with inotify. -} onErr :: Handler -onErr _ _ _ msg _ = error msg +onErr msg = error msg {- Called when a new transfer information file is written. -} onAdd :: Handler -onAdd st dstatus _ file _ = case parseTransferFile file of +onAdd file = case parseTransferFile file of Nothing -> noop - Just t -> go t =<< runThreadState st (checkTransfer t) - where - go _ Nothing = noop -- transfer already finished - go t (Just info) = do - debug thisThread - [ "transfer starting:" - , show t - ] - r <- headMaybe . filter (sameuuid t) - <$> runThreadState st Remote.remoteList - updateTransferInfo dstatus t info - { transferRemote = r } - sameuuid t r = Remote.uuid r == transferUUID t + Just t -> go t =<< liftAnnex (checkTransfer t) + where + go _ Nothing = noop -- transfer already finished + go t (Just info) = do + debug [ "transfer starting:", show t] + r <- headMaybe . filter (sameuuid t) + <$> liftAnnex Remote.remoteList + dstatus <- getAssistant daemonStatusHandle + liftIO $ updateTransferInfo dstatus t info { transferRemote = r } + sameuuid t r = Remote.uuid r == transferUUID t {- Called when a transfer information file is updated. - - The only thing that should change in the transfer info is the - bytesComplete, so that's the only thing updated in the DaemonStatus. -} onModify :: Handler -onModify _ dstatus _ file _ = do +onModify file = do case parseTransferFile file of Nothing -> noop - Just t -> go t =<< readTransferInfoFile Nothing file + Just t -> go t =<< liftIO (readTransferInfoFile Nothing file) where go _ Nothing = noop - go t (Just newinfo) = alterTransferInfo dstatus t $ \info -> - info { bytesComplete = bytesComplete newinfo } + go t (Just newinfo) = alterTransferInfo t + (\i -> i { bytesComplete = bytesComplete newinfo }) + <<~ daemonStatusHandle {- This thread can only watch transfer sizes when the DirWatcher supports - tracking modificatons to files. -} @@ -97,21 +90,19 @@ watchesTransferSize = modifyTracked {- Called when a transfer information file is removed. -} onDel :: Handler -onDel st dstatus transferqueue file _ = case parseTransferFile file of +onDel file = case parseTransferFile file of Nothing -> noop Just t -> do - debug thisThread - [ "transfer finishing:" - , show t - ] - minfo <- removeTransfer dstatus t + debug [ "transfer finishing:", show t] + minfo <- flip removeTransfer t <<~ daemonStatusHandle - void $ forkIO $ do + finished <- asIO2 finishedTransfer + void $ liftIO $ forkIO $ do {- XXX race workaround delay. The location - log needs to be updated before finishedTransfer - runs. -} threadDelay 10000000 -- 10 seconds - finishedTransfer st dstatus transferqueue t minfo + finished t minfo {- Queue uploads of files we successfully downloaded, spreading them - out to other reachable remotes. @@ -122,15 +113,19 @@ onDel st dstatus transferqueue file _ = case parseTransferFile file of - Uploading a file may cause the local repo, or some other remote to not - want it; handle that too. -} -finishedTransfer :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Transfer -> Maybe TransferInfo -> IO () -finishedTransfer st dstatus transferqueue t (Just info) - | transferDirection t == Download = runThreadState st $ - whenM (inAnnex $ transferKey t) $ do - handleDrops dstatus False +finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant () +finishedTransfer t (Just info) + | transferDirection t == Download = + whenM (liftAnnex $ inAnnex $ transferKey t) $ do + dstatus <- getAssistant daemonStatusHandle + transferqueue <- getAssistant transferQueue + liftAnnex $ handleDrops dstatus False (transferKey t) (associatedFile info) - queueTransfersMatching (/= transferUUID t) + liftAnnex $ queueTransfersMatching (/= transferUUID t) Later transferqueue dstatus (transferKey t) (associatedFile info) Upload - | otherwise = runThreadState st $ - handleDrops dstatus True (transferKey t) (associatedFile info) -finishedTransfer _ _ _ _ _ = noop + | otherwise = do + dstatus <- getAssistant daemonStatusHandle + liftAnnex $ handleDrops dstatus True (transferKey t) (associatedFile info) +finishedTransfer _ _ = noop + diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index 30d736073..2e880bef9 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -32,7 +32,7 @@ maxTransfers = 1 {- Dispatches transfers from the queue. -} transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> CommitChan -> NamedThread -transfererThread st dstatus transferqueue slots commitchan = thread $ go =<< readProgramFile +transfererThread st dstatus transferqueue slots commitchan = thread $ liftIO $ go =<< readProgramFile where thread = NamedThread thisThread go program = forever $ inTransferSlot dstatus slots $ @@ -47,11 +47,11 @@ startTransfer :: ThreadState -> DaemonStatusHandle -> CommitChan -> FilePath -> startTransfer st dstatus commitchan program t info = case (transferRemote info, associatedFile info) of (Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info) ( do - debug thisThread [ "Transferring:" , show t ] + brokendebug thisThread [ "Transferring:" , show t ] notifyTransfer dstatus return $ Just (t, info, transferprocess remote file) , do - debug thisThread [ "Skipping unnecessary transfer:" , show t ] + brokendebug thisThread [ "Skipping unnecessary transfer:" , show t ] void $ removeTransfer dstatus t return Nothing ) diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 310a6e984..172b7976e 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -15,7 +15,6 @@ module Assistant.Threads.Watcher ( ) where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes import Assistant.TransferQueue @@ -37,9 +36,6 @@ import Git.Types import Data.Bits.Utils import qualified Data.ByteString.Lazy as L -thisThread :: ThreadName -thisThread = "Watcher" - checkCanWatch :: Annex () checkCanWatch | canWatch = @@ -55,116 +51,122 @@ needLsof = error $ unlines , "Be warned: This can corrupt data in the annex, and make fsck complain." ] -watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread -watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do - void $ watchDir "." ignored hooks startup - debug thisThread [ "watching", "."] - where - startup = startupScan st dstatus - hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a - hooks = mkWatchHooks - { addHook = hook onAdd - , delHook = hook onDel - , addSymlinkHook = hook onAddSymlink - , delDirHook = hook onDelDir - , errHook = hook onErr - } +watchThread :: NamedThread +watchThread = NamedThread "Watcher" $ do + startup <- asIO startupScan + addhook <- hook onAdd + delhook <- hook onDel + addsymlinkhook <- hook onAddSymlink + deldirhook <- hook onDelDir + errhook <- hook onErr + let hooks = mkWatchHooks + { addHook = addhook + , delHook = delhook + , addSymlinkHook = addsymlinkhook + , delDirHook = deldirhook + , errHook = errhook + } + void $ liftIO $ watchDir "." ignored hooks startup + debug [ "watching", "."] + where + hook a = Just <$> asIO2 (runHandler a) {- Initial scartup scan. The action should return once the scan is complete. -} -startupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a -startupScan st dstatus scanner = do - runThreadState st $ showAction "scanning" - alertWhile' dstatus startupScanAlert $ do - r <- scanner +startupScan :: IO a -> Assistant a +startupScan scanner = do + liftAnnex $ showAction "scanning" + dstatus <- getAssistant daemonStatusHandle + alertWhile' dstatus startupScanAlert <~> do + r <- liftIO $ scanner -- Notice any files that were deleted before -- watching was started. - runThreadState st $ do + liftAnnex $ do inRepo $ Git.Command.run "add" [Param "--update"] showAction "started" - modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True } + liftIO $ modifyDaemonStatus_ dstatus $ + \s -> s { scanComplete = True } return (True, r) ignored :: FilePath -> Bool ignored = ig . takeFileName - where + where ig ".git" = True ig ".gitignore" = True ig ".gitattributes" = True ig _ = False -type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change) +type Handler = FilePath -> Maybe FileStatus -> Assistant (Maybe Change) -{- Runs an action handler, inside the Annex monad, and if there was a - - change, adds it to the ChangeChan. +{- Runs an action handler, and if there was a change, adds it to the ChangeChan. - - Exceptions are ignored, otherwise a whole watcher thread could be crashed. -} -runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do - r <- tryIO go +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file filestatus = void $ do + r <- tryIO <~> handler file filestatus case r of - Left e -> print e + Left e -> liftIO $ print e Right Nothing -> noop - Right (Just change) -> recordChange changechan change - where - go = runThreadState st $ handler threadname file filestatus dstatus transferqueue + Right (Just change) -> do + -- Just in case the commit thread is not + -- flushing the queue fast enough. + liftAnnex $ Annex.Queue.flushWhenFull + flip recordChange change <<~ changeChan onAdd :: Handler -onAdd _ file filestatus _ _ - | maybe False isRegularFile filestatus = pendingAddChange file - | otherwise = noChange - where +onAdd file filestatus + | maybe False isRegularFile filestatus = liftIO $ pendingAddChange file + | otherwise = liftIO $ noChange {- A symlink might be an arbitrary symlink, which is just added. - Or, if it is a git-annex symlink, ensure it points to the content - before adding it. -} onAddSymlink :: Handler -onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file - where - go (Just (key, _)) = do - link <- calcGitLink file key - ifM ((==) link <$> liftIO (readSymbolicLink file)) - ( do - s <- liftIO $ getDaemonStatus dstatus - checkcontent key s - ensurestaged link s - , do - liftIO $ debug threadname ["fix symlink", file] - liftIO $ removeFile file - liftIO $ createSymbolicLink link file - checkcontent key =<< liftIO (getDaemonStatus dstatus) - addlink link - ) - go Nothing = do -- other symlink - link <- liftIO (readSymbolicLink file) - ensurestaged link =<< liftIO (getDaemonStatus dstatus) - - {- This is often called on symlinks that are already - - staged correctly. A symlink may have been deleted - - and being re-added, or added when the watcher was - - not running. So they're normally restaged to make sure. - - - - As an optimisation, during the startup scan, avoid - - restaging everything. Only links that were created since - - the last time the daemon was running are staged. - - (If the daemon has never ran before, avoid staging - - links too.) - -} - ensurestaged link daemonstatus - | scanComplete daemonstatus = addlink link - | otherwise = case filestatus of - Just s - | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange - _ -> addlink link - - {- For speed, tries to reuse the existing blob for - - the symlink target. -} - addlink link = do - liftIO $ debug threadname ["add symlink", file] +onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file) + where + go (Just (key, _)) = do + link <- liftAnnex $ calcGitLink file key + ifM ((==) link <$> liftIO (readSymbolicLink file)) + ( do + s <- daemonStatus + checkcontent key s + ensurestaged link s + , do + liftIO $ removeFile file + liftIO $ createSymbolicLink link file + checkcontent key =<< daemonStatus + addlink link + ) + go Nothing = do -- other symlink + link <- liftIO (readSymbolicLink file) + ensurestaged link =<< daemonStatus + + {- This is often called on symlinks that are already + - staged correctly. A symlink may have been deleted + - and being re-added, or added when the watcher was + - not running. So they're normally restaged to make sure. + - + - As an optimisation, during the startup scan, avoid + - restaging everything. Only links that were created since + - the last time the daemon was running are staged. + - (If the daemon has never ran before, avoid staging + - links too.) + -} + ensurestaged link daemonstatus + | scanComplete daemonstatus = addlink link + | otherwise = case filestatus of + Just s + | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> liftIO noChange + _ -> addlink link + + {- For speed, tries to reuse the existing blob for symlink target. -} + addlink link = do + debug ["add symlink", file] + liftAnnex $ do v <- catObjectDetails $ Ref $ ':':file case v of Just (currlink, sha) @@ -174,26 +176,28 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l sha <- inRepo $ Git.HashObject.hashObject BlobObject link stageSymlink file sha - madeChange file LinkChange - - {- When a new link appears, or a link is changed, - - after the startup scan, handle getting or - - dropping the key's content. -} - checkcontent key daemonstatus - | scanComplete daemonstatus = do - present <- inAnnex key - unless present $ - queueTransfers Next transferqueue dstatus - key (Just file) Download - handleDrops dstatus present key (Just file) - | otherwise = noop + liftIO $ madeChange file LinkChange + + {- When a new link appears, or a link is changed, after the startup + - scan, handle getting or dropping the key's content. -} + checkcontent key daemonstatus + | scanComplete daemonstatus = do + present <- liftAnnex $ inAnnex key + dstatus <- getAssistant daemonStatusHandle + unless present $ do + transferqueue <- getAssistant transferQueue + liftAnnex $ queueTransfers Next transferqueue + dstatus key (Just file) Download + liftAnnex $ handleDrops dstatus present key (Just file) + | otherwise = noop onDel :: Handler -onDel threadname file _ _dstatus _ = do - liftIO $ debug threadname ["file deleted", file] - Annex.Queue.addUpdateIndex =<< - inRepo (Git.UpdateIndex.unstageFile file) - madeChange file RmChange +onDel file _ = do + debug ["file deleted", file] + liftAnnex $ + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.unstageFile file) + liftIO $ madeChange file RmChange {- A directory has been deleted, or moved, so tell git to remove anything - that was inside it from its cache. Since it could reappear at any time, @@ -203,18 +207,19 @@ onDel threadname file _ _dstatus _ = do - command to get the recursive list of files in the directory, so rm is - just as good. -} onDelDir :: Handler -onDelDir threadname dir _ _dstatus _ = do - liftIO $ debug threadname ["directory deleted", dir] - Annex.Queue.addCommand "rm" +onDelDir dir _ = do + debug ["directory deleted", dir] + liftAnnex $ Annex.Queue.addCommand "rm" [Params "--quiet -r --cached --ignore-unmatch --"] [dir] - madeChange dir RmDirChange + liftIO $ madeChange dir RmDirChange {- Called when there's an error with inotify or kqueue. -} onErr :: Handler -onErr _ msg _ dstatus _ = do - warning msg +onErr msg _ = do + liftAnnex $ warning msg + dstatus <- getAssistant daemonStatusHandle void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg - return Nothing + liftIO noChange {- Adds a symlink to the index, without ever accessing the actual symlink - on disk. This avoids a race if git add is used, where the symlink is diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs index 6ed827e01..02911bab9 100644 --- a/Assistant/Threads/WebApp.hs +++ b/Assistant/Threads/WebApp.hs @@ -28,12 +28,6 @@ import Assistant.WebApp.Configurators.XMPP import Assistant.WebApp.Documentation import Assistant.WebApp.OtherRepos import Assistant.ThreadedMonad -import Assistant.DaemonStatus -import Assistant.ScanRemotes -import Assistant.TransferQueue -import Assistant.TransferSlots -import Assistant.Pushes -import Assistant.Commits import Utility.WebApp import Utility.FileMode import Utility.TempFile @@ -51,53 +45,45 @@ mkYesodDispatch "WebApp" $(parseRoutesFile "Assistant/WebApp/routes") type Url = String -webAppThread - :: Maybe ThreadState - -> DaemonStatusHandle - -> ScanRemoteMap - -> TransferQueue - -> TransferSlots - -> PushNotifier - -> CommitChan +webAppThread + :: AssistantData -> UrlRenderer + -> Bool -> Maybe (IO String) -> Maybe (Url -> FilePath -> IO ()) -> NamedThread -webAppThread mst dstatus scanremotes transferqueue transferslots pushnotifier commitchan urlrenderer postfirstrun onstartup = thread $ do +webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ liftIO $ do webapp <- WebApp - <$> pure mst - <*> pure dstatus - <*> pure scanremotes - <*> pure transferqueue - <*> pure transferslots - <*> pure pushnotifier - <*> pure commitchan + <$> pure assistantdata <*> (pack <$> genRandomToken) - <*> getreldir mst + <*> getreldir <*> pure $(embed "static") <*> newWebAppState <*> pure postfirstrun + <*> pure noannex setUrlRenderer urlrenderer $ yesodRender webapp (pack "") app <- toWaiAppPlain webapp app' <- ifM debugEnabled ( return $ httpDebugLogger app , return app ) - runWebApp app' $ \port -> case mst of - Nothing -> withTempFile "webapp.html" $ \tmpfile _ -> + runWebApp app' $ \port -> if noannex + then withTempFile "webapp.html" $ \tmpfile _ -> go port webapp tmpfile Nothing - Just st -> do + else do + let st = threadState assistantdata htmlshim <- runThreadState st $ fromRepo gitAnnexHtmlShim urlfile <- runThreadState st $ fromRepo gitAnnexUrlFile go port webapp htmlshim (Just urlfile) where thread = NamedThread thisThread - getreldir Nothing = return Nothing - getreldir (Just st) = Just <$> - (relHome =<< absPath - =<< runThreadState st (fromRepo repoPath)) + getreldir + | noannex = return Nothing + | otherwise = Just <$> + (relHome =<< absPath + =<< runThreadState (threadState assistantdata) (fromRepo repoPath)) go port webapp htmlshim urlfile = do - debug thisThread ["running on port", show port] + brokendebug thisThread ["running on port", show port] let url = myUrl webapp port maybe noop (`writeFile` url) urlfile writeHtmlShim url htmlshim diff --git a/Backend.hs b/Backend.hs index d1dfdef3c..b66e6130e 100644 --- a/Backend.hs +++ b/Backend.hs @@ -40,16 +40,16 @@ orderedList = do if not $ null l then return l else handle =<< Annex.getState Annex.forcebackend - where - handle Nothing = standard - handle (Just "") = standard - handle (Just name) = do - l' <- (lookupBackendName name :) <$> standard - Annex.changeState $ \s -> s { Annex.backends = l' } - return l' - standard = parseBackendList <$> getConfig (annexConfig "backends") "" - parseBackendList [] = list - parseBackendList s = map lookupBackendName $ words s + where + handle Nothing = standard + handle (Just "") = standard + handle (Just name) = do + l' <- (lookupBackendName name :) <$> standard + Annex.changeState $ \s -> s { Annex.backends = l' } + return l' + standard = parseBackendList <$> getConfig (annexConfig "backends") "" + parseBackendList [] = list + parseBackendList s = map lookupBackendName $ words s {- Generates a key for a file, trying each backend in turn until one - accepts it. @@ -66,12 +66,12 @@ genKey' (b:bs) source = do case r of Nothing -> genKey' bs source Just k -> return $ Just (makesane k, b) - where - -- keyNames should not contain newline characters. - makesane k = k { keyName = map fixbadchar (keyName k) } - fixbadchar c - | c == '\n' = '_' - | otherwise = c + where + -- keyNames should not contain newline characters. + makesane k = k { keyName = map fixbadchar (keyName k) } + fixbadchar c + | c == '\n' = '_' + | otherwise = c {- Looks up the key and backend corresponding to an annexed file, - by examining what the file symlinks to. -} @@ -81,35 +81,33 @@ lookupFile file = do case tl of Left _ -> return Nothing Right l -> makekey l - where - makekey l = maybe (return Nothing) (makeret l) (fileKey $ takeFileName l) - makeret l k = let bname = keyBackendName k in - case maybeLookupBackendName bname of - Just backend -> do - return $ Just (k, backend) - Nothing -> do - when (isLinkToAnnex l) $ warning $ - "skipping " ++ file ++ - " (unknown backend " ++ - bname ++ ")" - return Nothing + where + makekey l = maybe (return Nothing) (makeret l) (fileKey $ takeFileName l) + makeret l k = let bname = keyBackendName k in + case maybeLookupBackendName bname of + Just backend -> do + return $ Just (k, backend) + Nothing -> do + when (isLinkToAnnex l) $ warning $ + "skipping " ++ file ++ + " (unknown backend " ++ bname ++ ")" + return Nothing {- Looks up the backend that should be used for a file. - That can be configured on a per-file basis in the gitattributes file. -} chooseBackend :: FilePath -> Annex (Maybe Backend) chooseBackend f = Annex.getState Annex.forcebackend >>= go - where - go Nothing = maybeLookupBackendName <$> - checkAttr "annex.backend" f - go (Just _) = Just . Prelude.head <$> orderedList + where + go Nothing = maybeLookupBackendName <$> checkAttr "annex.backend" f + go (Just _) = Just . Prelude.head <$> orderedList {- Looks up a backend by name. May fail if unknown. -} lookupBackendName :: String -> Backend lookupBackendName s = fromMaybe unknown $ maybeLookupBackendName s - where - unknown = error $ "unknown backend " ++ s + where + unknown = error $ "unknown backend " ++ s maybeLookupBackendName :: String -> Maybe Backend maybeLookupBackendName s = headMaybe matches - where - matches = filter (\b -> s == B.name b) list + where + matches = filter (\b -> s == B.name b) list diff --git a/Command/WebApp.hs b/Command/WebApp.hs index 5a372f94d..aff760ee4 100644 --- a/Command/WebApp.hs +++ b/Command/WebApp.hs @@ -12,11 +12,6 @@ import Command import Assistant import Assistant.Common import Assistant.DaemonStatus -import Assistant.ScanRemotes -import Assistant.TransferQueue -import Assistant.TransferSlots -import Assistant.Pushes -import Assistant.Commits import Assistant.Threads.WebApp import Assistant.WebApp import Assistant.Install @@ -101,20 +96,21 @@ autoStart autostartfile = do -} firstRun :: IO () firstRun = do + {- Without a repository, we cannot have an Annex monad, so cannot + - get a ThreadState. Using undefined is only safe because the + - webapp checks its noAnnex field before accessing the + - threadstate. -} + let st = undefined + {- Get a DaemonStatus without running in the Annex monad. -} dstatus <- atomically . newTMVar =<< newDaemonStatus - scanremotes <- newScanRemoteMap - transferqueue <- newTransferQueue - transferslots <- newTransferSlots + d <- newAssistantData st dstatus urlrenderer <- newUrlRenderer - pushnotifier <- newPushNotifier - commitchan <- newCommitChan v <- newEmptyMVar let callback a = Just $ a v - void $ runNamedThread dstatus $ - webAppThread Nothing dstatus scanremotes - transferqueue transferslots pushnotifier commitchan - urlrenderer - (callback signaler) (callback mainthread) + void $ flip runAssistant d $ runNamedThread $ + webAppThread d urlrenderer True + (callback signaler) + (callback mainthread) where signaler v = do putMVar v "" @@ -54,9 +54,9 @@ getMatcher' = do {- Adds something to the limit list, which is built up reversed. -} add :: Utility.Matcher.Token (Annex.FileInfo -> Annex Bool) -> Annex () add l = Annex.changeState $ \s -> s { Annex.limit = prepend $ Annex.limit s } - where - prepend (Left ls) = Left $ l:ls - prepend _ = error "internal" + where + prepend (Left ls) = Left $ l:ls + prepend _ = error "internal" {- Adds a new token. -} addToken :: String -> Annex () @@ -83,9 +83,9 @@ limitExclude glob = Right $ const $ return . not . matchglob glob matchglob :: String -> Annex.FileInfo -> Bool matchglob glob (Annex.FileInfo { Annex.matchFile = f }) = isJust $ match cregex f [] - where - cregex = compile regex [] - regex = '^':wildToRegex glob + where + cregex = compile regex [] + regex = '^':wildToRegex glob {- Adds a limit to skip files not believed to be present - in a specfied repository. -} @@ -97,21 +97,21 @@ limitIn name = Right $ \notpresent -> check $ if name == "." then inhere notpresent else inremote notpresent - where - check a = lookupFile >=> handle a - handle _ Nothing = return False - handle a (Just (key, _)) = a key - inremote notpresent key = do - u <- Remote.nameToUUID name - us <- Remote.keyLocations key - return $ u `elem` us && u `S.notMember` notpresent - inhere notpresent key - | S.null notpresent = inAnnex key - | otherwise = do - u <- getUUID - if u `S.member` notpresent - then return False - else inAnnex key + where + check a = lookupFile >=> handle a + handle _ Nothing = return False + handle a (Just (key, _)) = a key + inremote notpresent key = do + u <- Remote.nameToUUID name + us <- Remote.keyLocations key + return $ u `elem` us && u `S.notMember` notpresent + inhere notpresent key + | S.null notpresent = inAnnex key + | otherwise = do + u <- getUUID + if u `S.member` notpresent + then return False + else inAnnex key {- Limit to content that is currently present on a uuid. -} limitPresent :: Maybe UUID -> MkLimit @@ -122,10 +122,10 @@ limitPresent u _ = Right $ const $ check $ \key -> do else do us <- Remote.keyLocations key return $ maybe False (`elem` us) u - where - check a = lookupFile >=> handle a - handle _ Nothing = return False - handle a (Just (key, _)) = a key + where + check a = lookupFile >=> handle a + handle _ Nothing = return False + handle a (Just (key, _)) = a key {- Adds a limit to skip files not believed to have the specified number - of copies. -} @@ -139,18 +139,18 @@ limitCopies want = case split ":" want of Nothing -> go n $ checkgroup v [n] -> go n $ const $ return True _ -> Left "bad value for copies" - where - go num good = case readish num of - Nothing -> Left "bad number for copies" - Just n -> Right $ \notpresent f -> - lookupFile f >>= handle n good notpresent - handle _ _ _ Nothing = return False - handle n good notpresent (Just (key, _)) = do - us <- filter (`S.notMember` notpresent) - <$> (filterM good =<< Remote.keyLocations key) - return $ length us >= n - checktrust t u = (== t) <$> lookupTrust u - checkgroup g u = S.member g <$> lookupGroups u + where + go num good = case readish num of + Nothing -> Left "bad number for copies" + Just n -> Right $ \notpresent f -> + lookupFile f >>= handle n good notpresent + handle _ _ _ Nothing = return False + handle n good notpresent (Just (key, _)) = do + us <- filter (`S.notMember` notpresent) + <$> (filterM good =<< Remote.keyLocations key) + return $ length us >= n + checktrust t u = (== t) <$> lookupTrust u + checkgroup g u = S.member g <$> lookupGroups u {- Adds a limit to skip files not believed to be present in all - repositories in the specified group. -} @@ -163,15 +163,15 @@ limitInAllGroup :: GroupMap -> MkLimit limitInAllGroup m groupname | S.null want = Right $ const $ const $ return True | otherwise = Right $ \notpresent -> lookupFile >=> check notpresent - where - want = fromMaybe S.empty $ M.lookup groupname $ uuidsByGroup m - check _ Nothing = return False - check notpresent (Just (key, _)) - -- optimisation: Check if a wanted uuid is notpresent. - | not (S.null (S.intersection want notpresent)) = return False - | otherwise = do - present <- S.fromList <$> Remote.keyLocations key - return $ S.null $ want `S.difference` present + where + want = fromMaybe S.empty $ M.lookup groupname $ uuidsByGroup m + check _ Nothing = return False + check notpresent (Just (key, _)) + -- optimisation: Check if a wanted uuid is notpresent. + | not (S.null (S.intersection want notpresent)) = return False + | otherwise = do + present <- S.fromList <$> Remote.keyLocations key + return $ S.null $ want `S.difference` present {- Adds a limit to skip files not using a specified key-value backend. -} addInBackend :: String -> Annex () @@ -179,9 +179,9 @@ addInBackend = addLimit . limitInBackend limitInBackend :: MkLimit limitInBackend name = Right $ const $ lookupFile >=> check - where - wanted = Backend.lookupBackendName name - check = return . maybe False ((==) wanted . snd) + where + wanted = Backend.lookupBackendName name + check = return . maybe False ((==) wanted . snd) {- Adds a limit to skip files that are too large or too small -} addLargerThan :: String -> Annex () @@ -194,9 +194,9 @@ limitSize :: (Maybe Integer -> Maybe Integer -> Bool) -> MkLimit limitSize vs s = case readSize dataUnits s of Nothing -> Left "bad size" Just sz -> Right $ const $ lookupFile >=> check sz - where - check _ Nothing = return False - check sz (Just (key, _)) = return $ keySize key `vs` Just sz + where + check _ Nothing = return False + check sz (Just (key, _)) = return $ keySize key `vs` Just sz addTimeLimit :: String -> Annex () addTimeLimit s = do diff --git a/Messages.hs b/Messages.hs index 08a17b25c..f3cd9fc0e 100644 --- a/Messages.hs +++ b/Messages.hs @@ -65,29 +65,29 @@ showProgress = handle q $ - The action is passed a callback to use to update the meter. -} metered :: (Maybe MeterUpdate) -> Key -> (MeterUpdate -> Annex a) -> Annex a metered combinemeterupdate key a = withOutputType $ go (keySize key) - where - go (Just size) NormalOutput = do - progress <- liftIO $ newProgress "" size - meter <- liftIO $ newMeter progress "B" 25 (renderNums binaryOpts 1) - showOutput - liftIO $ displayMeter stdout meter - r <- a $ \n -> liftIO $ do - incrP progress n - displayMeter stdout meter - maybe noop (\m -> m n) combinemeterupdate - liftIO $ clearMeter stdout meter - return r - go _ _ = a (const noop) + where + go (Just size) NormalOutput = do + progress <- liftIO $ newProgress "" size + meter <- liftIO $ newMeter progress "B" 25 (renderNums binaryOpts 1) + showOutput + liftIO $ displayMeter stdout meter + r <- a $ \n -> liftIO $ do + incrP progress n + displayMeter stdout meter + maybe noop (\m -> m n) combinemeterupdate + liftIO $ clearMeter stdout meter + return r + go _ _ = a (const noop) showSideAction :: String -> Annex () showSideAction m = Annex.getState Annex.output >>= go - where - go (MessageState v StartBlock) = do - p - Annex.changeState $ \s -> s { Annex.output = MessageState v InBlock } - go (MessageState _ InBlock) = return () - go _ = p - p = handle q $ putStrLn $ "(" ++ m ++ "...)" + where + go (MessageState v StartBlock) = do + p + Annex.changeState $ \s -> s { Annex.output = MessageState v InBlock } + go (MessageState _ InBlock) = return () + go _ = p + p = handle q $ putStrLn $ "(" ++ m ++ "...)" showStoringStateAction :: Annex () showStoringStateAction = showSideAction "Recording state in git" @@ -106,8 +106,8 @@ doSideAction' b a = do o <- Annex.getState Annex.output set $ o { sideActionBlock = b } set o `after` a - where - set o = Annex.changeState $ \s -> s { Annex.output = o } + where + set o = Annex.changeState $ \s -> s { Annex.output = o } showOutput :: Annex () showOutput = handle q $ @@ -125,10 +125,10 @@ showEndFail = showEndResult False showEndResult :: Bool -> Annex () showEndResult ok = handle (JSON.end ok) $ putStrLn msg - where - msg - | ok = "ok" - | otherwise = "failed" + where + msg + | ok = "ok" + | otherwise = "failed" showErr :: (Show a) => a -> Annex () showErr e = warning' $ "git-annex: " ++ show e @@ -153,9 +153,9 @@ maybeShowJSON v = handle (JSON.add v) q {- Shows a complete JSON value, only when in json mode. -} showFullJSON :: JSON a => [(String, a)] -> Annex Bool showFullJSON v = withOutputType $ liftIO . go - where - go JSONOutput = JSON.complete v >> return True - go _ = return False + where + go JSONOutput = JSON.complete v >> return True + go _ = return False {- Performs an action that outputs nonstandard/customized output, and - in JSON mode wraps its output in JSON.start and JSON.end, so it's @@ -184,10 +184,10 @@ setupConsole = do handle :: IO () -> IO () -> Annex () handle json normal = withOutputType go - where - go NormalOutput = liftIO normal - go QuietOutput = q - go JSONOutput = liftIO $ flushed json + where + go NormalOutput = liftIO normal + go QuietOutput = q + go JSONOutput = liftIO $ flushed json q :: Monad m => m () q = noop @@ -23,30 +23,30 @@ usage header cmds commonoptions = unlines $ , "Commands:" , "" ] ++ cmdlines - where - -- To get consistent indentation of options, generate the - -- usage for all options at once. A command's options will - -- be displayed after the command. - alloptlines = filter (not . null) $ - lines $ usageInfo "" $ - concatMap cmdoptions scmds ++ commonoptions - (cmdlines, optlines) = go scmds alloptlines [] - go [] os ls = (ls, os) - go (c:cs) os ls = go cs os' (ls++(l:o)) - where - (o, os') = splitAt (length $ cmdoptions c) os - l = concat - [ cmdname c - , namepad (cmdname c) - , cmdparamdesc c - , descpad (cmdparamdesc c) - , cmddesc c - ] - pad n s = replicate (n - length s) ' ' - namepad = pad $ longest cmdname + 1 - descpad = pad $ longest cmdparamdesc + 2 - longest f = foldl max 0 $ map (length . f) cmds - scmds = sort cmds + where + -- To get consistent indentation of options, generate the + -- usage for all options at once. A command's options will + -- be displayed after the command. + alloptlines = filter (not . null) $ + lines $ usageInfo "" $ + concatMap cmdoptions scmds ++ commonoptions + (cmdlines, optlines) = go scmds alloptlines [] + go [] os ls = (ls, os) + go (c:cs) os ls = go cs os' (ls++(l:o)) + where + (o, os') = splitAt (length $ cmdoptions c) os + l = concat + [ cmdname c + , namepad (cmdname c) + , cmdparamdesc c + , descpad (cmdparamdesc c) + , cmddesc c + ] + pad n s = replicate (n - length s) ' ' + namepad = pad $ longest cmdname + 1 + descpad = pad $ longest cmdparamdesc + 2 + longest f = foldl max 0 $ map (length . f) cmds + scmds = sort cmds {- Descriptions of params used in usage messages. -} paramPaths :: String |