diff options
author | Joey Hess <joey@kitenet.net> | 2012-10-29 11:40:22 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-10-29 11:40:22 -0400 |
commit | f901112e1ce30f43dc7294e0bd0616bb02556500 (patch) | |
tree | 92ab6d6f220ea21e0cc7feeff6caca52d4d2b677 | |
parent | 710dfa7e3ec897d6f02930540b10bb303e3a9c91 (diff) |
converted 6 more threads
-rw-r--r-- | Assistant.hs | 16 | ||||
-rw-r--r-- | Assistant/Threads/Committer.hs | 215 | ||||
-rw-r--r-- | Assistant/Threads/ConfigMonitor.hs | 72 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 72 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 119 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 58 | ||||
-rw-r--r-- | Assistant/Threads/TransferScanner.hs | 164 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 4 |
8 files changed, 350 insertions, 370 deletions
diff --git a/Assistant.hs b/Assistant.hs index 07f022aa6..be4cbeb9c 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -182,26 +182,24 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do dstatus <- getAssistant daemonStatusHandle changechan <- getAssistant changeChan commitchan <- getAssistant commitChan - pushmap <- getAssistant failedPushMap transferqueue <- getAssistant transferQueue transferslots <- getAssistant transferSlots scanremotes <- getAssistant scanRemoteMap - branchhandle <- getAssistant branchChangeHandle pushnotifier <- getAssistant pushNotifier #ifdef WITH_WEBAPP urlrenderer <- liftIO newUrlRenderer #endif mapM_ (startthread d) - [ watch $ commitThread st changechan commitchan transferqueue dstatus + [ watch $ commitThread #ifdef WITH_WEBAPP , assist $ webAppThread d urlrenderer False Nothing webappwaiter #ifdef WITH_PAIRING , assist $ pairListenerThread st dstatus scanremotes urlrenderer #endif #endif - , assist $ pushThread st dstatus commitchan pushmap pushnotifier - , assist $ pushRetryThread st dstatus pushmap pushnotifier - , assist $ mergeThread st dstatus transferqueue branchhandle + , assist $ pushThread + , assist $ pushRetryThread + , assist $ mergeThread , assist $ transferWatcherThread st dstatus transferqueue , assist $ transferPollerThread , assist $ transfererThread st dstatus transferqueue transferslots commitchan @@ -210,10 +208,10 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do , assist $ mountWatcherThread st dstatus scanremotes pushnotifier , assist $ netWatcherThread , assist $ netWatcherFallbackThread - , assist $ transferScannerThread st dstatus scanremotes transferqueue - , assist $ configMonitorThread st dstatus branchhandle commitchan + , assist $ transferScannerThread + , assist $ configMonitorThread #ifdef WITH_XMPP - , assist $ pushNotifierThread st dstatus pushnotifier + , assist $ pushNotifierThread #endif , watch $ watchThread ] diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index 11c6dc584..2ab693f05 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -13,7 +13,6 @@ import Assistant.Common import Assistant.Changes import Assistant.Commits import Assistant.Alert -import Assistant.ThreadedMonad import Assistant.Threads.Watcher import Assistant.TransferQueue import Assistant.DaemonStatus @@ -37,48 +36,40 @@ import Data.Tuple.Utils import qualified Data.Set as S import Data.Either -thisThread :: ThreadName -thisThread = "Committer" - {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread -commitThread st changechan commitchan transferqueue dstatus = thread $ liftIO $ do - delayadd <- runThreadState st $ +commitThread :: NamedThread +commitThread = NamedThread "Committer" $ do + delayadd <- liftAnnex $ maybe delayaddDefault (Just . Seconds) . readish <$> getConfig (annexConfig "delayadd") "" - runEvery (Seconds 1) $ do + runEvery (Seconds 1) <~> do -- We already waited one second as a simple rate limiter. -- Next, wait until at least one change is available for -- processing. - changes <- getChanges changechan + changes <- getChanges <<~ changeChan -- Now see if now's a good time to commit. - time <- getCurrentTime + time <- liftIO getCurrentTime if shouldCommit time changes then do - readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes + readychanges <- handleAdds delayadd changes if shouldCommit time readychanges then do - brokendebug thisThread + debug [ "committing" , show (length readychanges) , "changes" ] - void $ alertWhile dstatus commitAlert $ - runThreadState st commitStaged - recordCommit commitchan + dstatus <- getAssistant daemonStatusHandle + void $ alertWhile dstatus commitAlert <~> + liftAnnex commitStaged + recordCommit <<~ commitChan else refill readychanges else refill changes - where - thread = NamedThread thisThread - refill [] = noop - refill cs = do - brokendebug thisThread - [ "delaying commit of" - , show (length cs) - , "changes" - ] - refillChanges changechan cs - + where + refill [] = noop + refill cs = do + debug ["delaying commit of", show (length cs), "changes"] + flip refillChanges cs <<~ changeChan commitStaged :: Annex Bool commitStaged = do @@ -99,12 +90,12 @@ commitStaged = do - each other out, etc. Git returns nonzero on those, - so don't propigate out commit failures. -} return True - where - nomessage ps - | Git.Version.older "1.7.2" = Param "-m" - : Param "autocommit" : ps - | otherwise = Param "--allow-empty-message" - : Param "-m" : Param "" : ps + where + nomessage ps + | Git.Version.older "1.7.2" = Param "-m" + : Param "autocommit" : ps + | otherwise = Param "--allow-empty-message" + : Param "-m" : Param "" : ps {- Decide if now is a good time to make a commit. - Note that the list of change times has an undefined order. @@ -118,9 +109,9 @@ shouldCommit now changes | len > 10000 = True -- avoid bloating queue too much | length (filter thisSecond changes) < 10 = True | otherwise = False -- batch activity - where - len = length changes - thisSecond c = now `diffUTCTime` changeTime c <= 1 + where + len = length changes + thisSecond c = now `diffUTCTime` changeTime c <= 1 {- OSX needs a short delay after a file is added before locking it down, - as pasting a file seems to try to set file permissions or otherwise @@ -152,77 +143,82 @@ delayaddDefault = Nothing - Any pending adds that are not ready yet are put back into the ChangeChan, - where they will be retried later. -} -handleAdds :: Maybe Seconds -> ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change] -handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null incomplete) $ do +handleAdds :: Maybe Seconds -> [Change] -> Assistant [Change] +handleAdds delayadd cs = returnWhen (null incomplete) $ do let (pending, inprocess) = partition isPendingAddChange incomplete pending' <- findnew pending - (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd st pending' inprocess + (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd pending' inprocess unless (null postponed) $ - refillChanges changechan postponed + flip refillChanges postponed <<~ changeChan returnWhen (null toadd) $ do added <- catMaybes <$> forM toadd add if DirWatcher.eventsCoalesce || null added then return $ added ++ otherchanges else do - r <- handleAdds delayadd st changechan transferqueue dstatus - =<< getChanges changechan + r <- handleAdds delayadd + =<< getChanges <<~ changeChan return $ r ++ added ++ otherchanges - where - (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs + where + (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs - findnew [] = return [] - findnew pending@(exemplar:_) = do - (!newfiles, cleanup) <- runThreadState st $ - inRepo (Git.LsFiles.notInRepo False $ map changeFile pending) - void cleanup - -- note: timestamp info is lost here - let ts = changeTime exemplar - return $ map (PendingAddChange ts) newfiles + findnew [] = return [] + findnew pending@(exemplar:_) = do + (!newfiles, cleanup) <- liftAnnex $ + inRepo (Git.LsFiles.notInRepo False $ map changeFile pending) + void $ liftIO cleanup + -- note: timestamp info is lost here + let ts = changeTime exemplar + return $ map (PendingAddChange ts) newfiles + + returnWhen c a + | c = return otherchanges + | otherwise = a - returnWhen c a - | c = return otherchanges - | otherwise = a + add :: Change -> Assistant (Maybe Change) + add change@(InProcessAddChange { keySource = ks }) = do + dstatus <- getAssistant daemonStatusHandle + alertWhile' dstatus (addFileAlert $ keyFilename ks) <~> add' change ks + add _ = return Nothing - add :: Change -> IO (Maybe Change) - add change@(InProcessAddChange { keySource = ks }) = - alertWhile' dstatus (addFileAlert $ keyFilename ks) $ - liftM ret $ catchMaybeIO $ - sanitycheck ks $ runThreadState st $ do - showStart "add" $ keyFilename ks - key <- Command.Add.ingest ks - done (finishedChange change) (keyFilename ks) key - where - {- Add errors tend to be transient and will - - be automatically dealt with, so don't - - pass to the alert code. -} - ret (Just j@(Just _)) = (True, j) - ret _ = (True, Nothing) - add _ = return Nothing + add' change ks = liftM ret $ catchMaybeIO <~> do + sanitycheck ks $ do + key <- liftAnnex $ do + showStart "add" $ keyFilename ks + Command.Add.ingest ks + done (finishedChange change) (keyFilename ks) key + where + {- Add errors tend to be transient and will be automatically + - dealt with, so don't pass to the alert code. -} + ret (Just j@(Just _)) = (True, j) + ret _ = (True, Nothing) - done _ _ Nothing = do - showEndFail - return Nothing - done change file (Just key) = do - link <- Command.Add.link file key True - when DirWatcher.eventsCoalesce $ do + done _ _ Nothing = do + liftAnnex showEndFail + return Nothing + done change file (Just key) = do + link <- liftAnnex $ Command.Add.link file key True + when DirWatcher.eventsCoalesce $ + liftAnnex $ do sha <- inRepo $ Git.HashObject.hashObject BlobObject link stageSymlink file sha - queueTransfers Next transferqueue dstatus key (Just file) Upload - showEndOk - return $ Just change + showEndOk + transferqueue <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftAnnex $ queueTransfers Next transferqueue dstatus key (Just file) Upload + return $ Just change - {- Check that the keysource's keyFilename still exists, - - and is still a hard link to its contentLocation, - - before ingesting it. -} - sanitycheck keysource a = do - fs <- getSymbolicLinkStatus $ keyFilename keysource - ks <- getSymbolicLinkStatus $ contentLocation keysource - if deviceID ks == deviceID fs && fileID ks == fileID fs - then a - else return Nothing + {- Check that the keysource's keyFilename still exists, + - and is still a hard link to its contentLocation, + - before ingesting it. -} + sanitycheck keysource a = do + fs <- liftIO $ getSymbolicLinkStatus $ keyFilename keysource + ks <- liftIO $ getSymbolicLinkStatus $ contentLocation keysource + if deviceID ks == deviceID fs && fileID ks == fileID fs + then a + else return Nothing {- Files can Either be Right to be added now, - or are unsafe, and must be Left for later. @@ -230,11 +226,11 @@ handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null in - Check by running lsof on the temp directory, which - the KeySources are locked down in. -} -safeToAdd :: Maybe Seconds -> ThreadState -> [Change] -> [Change] -> IO [Either Change Change] -safeToAdd _ _ [] [] = return [] -safeToAdd delayadd st pending inprocess = do - maybe noop threadDelaySeconds delayadd - runThreadState st $ do +safeToAdd :: Maybe Seconds -> [Change] -> [Change] -> Assistant [Either Change Change] +safeToAdd _ [] [] = return [] +safeToAdd delayadd pending inprocess = do + maybe noop (liftIO . threadDelaySeconds) delayadd + liftAnnex $ do keysources <- mapM Command.Add.lockDown (map changeFile pending) let inprocess' = map mkinprocess (zip pending keysources) tmpdir <- fromRepo gitAnnexTmpDir @@ -250,25 +246,24 @@ safeToAdd delayadd st pending inprocess = do mapM_ canceladd $ lefts checked allRight $ rights checked else return checked - where - check openfiles change@(InProcessAddChange { keySource = ks }) - | S.member (contentLocation ks) openfiles = Left change - check _ change = Right change + where + check openfiles change@(InProcessAddChange { keySource = ks }) + | S.member (contentLocation ks) openfiles = Left change + check _ change = Right change - mkinprocess (c, ks) = InProcessAddChange - { changeTime = changeTime c - , keySource = ks - } + mkinprocess (c, ks) = InProcessAddChange + { changeTime = changeTime c + , keySource = ks + } - canceladd (InProcessAddChange { keySource = ks }) = do - warning $ keyFilename ks - ++ " still has writers, not adding" - -- remove the hard link - void $ liftIO $ tryIO $ - removeFile $ contentLocation ks - canceladd _ = noop + canceladd (InProcessAddChange { keySource = ks }) = do + warning $ keyFilename ks + ++ " still has writers, not adding" + -- remove the hard link + void $ liftIO $ tryIO $ removeFile $ contentLocation ks + canceladd _ = noop - openwrite (_file, mode, _pid) = - mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite + openwrite (_file, mode, _pid) = + mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite - allRight = return . map Right + allRight = return . map Right diff --git a/Assistant/Threads/ConfigMonitor.hs b/Assistant/Threads/ConfigMonitor.hs index 2d5df48dd..fe98b10e8 100644 --- a/Assistant/Threads/ConfigMonitor.hs +++ b/Assistant/Threads/ConfigMonitor.hs @@ -9,7 +9,6 @@ module Assistant.Threads.ConfigMonitor where import Assistant.Common import Assistant.BranchChange -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Commits import Utility.ThreadScheduler @@ -19,10 +18,8 @@ import Logs.Remote import Logs.PreferredContent import Logs.Group import Remote.List (remoteListRefresh) -import qualified Git import qualified Git.LsTree as LsTree import qualified Annex.Branch -import qualified Annex import qualified Data.Set as S @@ -37,26 +34,22 @@ thisThread = "ConfigMonitor" - if the branch has not changed in a while, configuration changes will - be detected immediately. -} -configMonitorThread :: ThreadState -> DaemonStatusHandle -> BranchChangeHandle -> CommitChan -> NamedThread -configMonitorThread st dstatus branchhandle commitchan = thread $ liftIO $ do - r <- runThreadState st Annex.gitRepo - go r =<< getConfigs r - where - thread = NamedThread thisThread - - go r old = do - threadDelaySeconds (Seconds 60) - waitBranchChange branchhandle - new <- getConfigs r - when (old /= new) $ do - let changedconfigs = new `S.difference` old - brokendebug thisThread $ "reloading config" : - map fst (S.toList changedconfigs) - reloadConfigs st dstatus changedconfigs - {- Record a commit to get this config - - change pushed out to remotes. -} - recordCommit commitchan - go r new +configMonitorThread :: NamedThread +configMonitorThread = NamedThread "ConfigMonitor" $ loop =<< getConfigs + where + loop old = do + liftIO $ threadDelaySeconds (Seconds 60) + waitBranchChange <<~ branchChangeHandle + new <- getConfigs + when (old /= new) $ do + let changedconfigs = new `S.difference` old + debug $ "reloading config" : + map fst (S.toList changedconfigs) + reloadConfigs new + {- Record a commit to get this config + - change pushed out to remotes. -} + recordCommit <<~ commitChan + loop new {- Config files, and their checksums. -} type Configs = S.Set (FilePath, String) @@ -73,22 +66,23 @@ configFilesActions = , (preferredContentLog, noop) ] -reloadConfigs :: ThreadState -> DaemonStatusHandle -> Configs -> IO () -reloadConfigs st dstatus changedconfigs = runThreadState st $ do - sequence_ as - void preferredContentMapLoad +reloadConfigs :: Configs -> Assistant () +reloadConfigs changedconfigs = do + liftAnnex $ do + sequence_ as + void preferredContentMapLoad {- Changes to the remote log, or the trust log, can affect the - syncRemotes list -} - when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $ - updateSyncRemotes dstatus - where - (fs, as) = unzip $ filter (flip S.member changedfiles . fst) - configFilesActions - changedfiles = S.map fst changedconfigs + when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $ + liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle + where + (fs, as) = unzip $ filter (flip S.member changedfiles . fst) + configFilesActions + changedfiles = S.map fst changedconfigs -getConfigs :: Git.Repo -> IO Configs -getConfigs r = S.fromList . map extract - <$> LsTree.lsTreeFiles Annex.Branch.fullname files r - where - files = map fst configFilesActions - extract treeitem = (LsTree.file treeitem, LsTree.sha treeitem) +getConfigs :: Assistant Configs +getConfigs = S.fromList . map extract + <$> liftAnnex (inRepo $ LsTree.lsTreeFiles Annex.Branch.fullname files) + where + files = map fst configFilesActions + extract treeitem = (LsTree.file treeitem, LsTree.sha treeitem) diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index 152b40361..a766c5977 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -8,8 +8,6 @@ module Assistant.Threads.Merger where import Assistant.Common -import Assistant.ThreadedMonad -import Assistant.DaemonStatus import Assistant.TransferQueue import Assistant.BranchChange import Utility.DirWatcher @@ -24,36 +22,34 @@ thisThread = "Merger" {- This thread watches for changes to .git/refs/, and handles incoming - pushes. -} -mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> NamedThread -mergeThread st dstatus transferqueue branchchange = thread $ liftIO $ do - g <- runThreadState st gitRepo +mergeThread :: NamedThread +mergeThread = NamedThread "Merger" $ do + g <- liftAnnex gitRepo let dir = Git.localGitDir g </> "refs" - createDirectoryIfMissing True dir - let hook a = Just $ runHandler st dstatus transferqueue branchchange a + liftIO $ createDirectoryIfMissing True dir + let hook a = Just <$> asIO2 (runHandler a) + addhook <- hook onAdd + errhook <- hook onErr let hooks = mkWatchHooks - { addHook = hook onAdd - , errHook = hook onErr + { addHook = addhook + , errHook = errhook } - void $ watchDir dir (const False) hooks id - brokendebug thisThread ["watching", dir] - where - thread = NamedThread thisThread + void $ liftIO $ watchDir dir (const False) hooks id + debug ["watching", dir] -type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> FilePath -> Maybe FileStatus -> IO () +type Handler = FilePath -> Assistant () {- Runs an action handler. - - Exceptions are ignored, otherwise a whole thread could be crashed. -} -runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus transferqueue branchchange handler file filestatus = void $ - either print (const noop) =<< tryIO go - where - go = handler st dstatus transferqueue branchchange file filestatus +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file _filestatus = + either (liftIO . print) (const noop) =<< tryIO <~> handler file {- Called when there's an error with inotify. -} onErr :: Handler -onErr _ _ _ _ msg _ = error msg +onErr msg = error msg {- Called when a new branch ref is written. - @@ -67,29 +63,29 @@ onErr _ _ _ _ msg _ = error msg - ran are merged in. -} onAdd :: Handler -onAdd st dstatus transferqueue branchchange file _ +onAdd file | ".lock" `isSuffixOf` file = noop | isAnnexBranch file = do - branchChanged branchchange - runThreadState st $ + branchChanged <<~ branchChangeHandle + transferqueue <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftAnnex $ whenM Annex.Branch.forceUpdate $ queueDeferredDownloads Later transferqueue dstatus - | "/synced/" `isInfixOf` file = runThreadState st $ do - mergecurrent =<< inRepo Git.Branch.current + | "/synced/" `isInfixOf` file = do + mergecurrent =<< liftAnnex (inRepo Git.Branch.current) | otherwise = noop - where - changedbranch = fileToBranch file - mergecurrent (Just current) - | equivBranches changedbranch current = do - liftIO $ brokendebug thisThread - [ "merging" - , show changedbranch - , "into" - , show current - ] - void $ inRepo $ - Git.Merge.mergeNonInteractive changedbranch - mergecurrent _ = noop + where + changedbranch = fileToBranch file + mergecurrent (Just current) + | equivBranches changedbranch current = do + debug + [ "merging", show changedbranch + , "into", show current + ] + void $ liftAnnex $ inRepo $ + Git.Merge.mergeNonInteractive changedbranch + mergecurrent _ = noop equivBranches :: Git.Ref -> Git.Ref -> Bool equivBranches x y = base x == base y diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs index 591c8b18b..d19369b8d 100644 --- a/Assistant/Threads/PushNotifier.hs +++ b/Assistant/Threads/PushNotifier.hs @@ -12,7 +12,6 @@ module Assistant.Threads.PushNotifier where import Assistant.Common import Assistant.XMPP -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Pushes import Assistant.Sync @@ -25,56 +24,56 @@ import qualified Data.Set as S import qualified Git.Branch import Data.Time.Clock -thisThread :: ThreadName -thisThread = "PushNotifier" +pushNotifierThread :: NamedThread +pushNotifierThread = NamedThread "PushNotifier" $ do + iodebug <- asIO debug + iopull <- asIO pull + pn <- getAssistant pushNotifier + controllerThread pn <~> xmppClient pn iodebug iopull controllerThread :: PushNotifier -> IO () -> IO () -controllerThread pushnotifier a = forever $ do - tid <- forkIO a +controllerThread pushnotifier xmppclient = forever $ do + tid <- forkIO xmppclient waitRestart pushnotifier killThread tid -pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread -pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $ - controllerThread pushnotifier $ do - v <- runThreadState st $ getXMPPCreds - case v of - Nothing -> noop - Just c -> loop c =<< getCurrentTime - where - loop c starttime = do - void $ connectXMPP c $ \jid -> do - fulljid <- bindJID jid - liftIO $ brokendebug thisThread ["XMPP connected", show fulljid] - putStanza $ gitAnnexPresence gitAnnexSignature - s <- getSession - _ <- liftIO $ forkIO $ void $ runXMPP s $ - receivenotifications - sendnotifications - now <- getCurrentTime - if diffUTCTime now starttime > 300 - then do - brokendebug thisThread ["XMPP connection lost; reconnecting"] - loop c now - else do - brokendebug thisThread ["XMPP connection failed; will retry"] - threadDelaySeconds (Seconds 300) - loop c =<< getCurrentTime - - sendnotifications = forever $ do - us <- liftIO $ waitPush pushnotifier - putStanza $ gitAnnexPresence $ encodePushNotification us - - receivenotifications = forever $ do - s <- getStanza - liftIO $ brokendebug thisThread ["received XMPP:", show s] - case s of - ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> - liftIO $ pull st dstatus $ - concat $ catMaybes $ - map decodePushNotification $ - presencePayloads p - _ -> noop +xmppClient :: PushNotifier -> ([String] -> IO ()) -> ([UUID] -> IO ()) -> Assistant () +xmppClient pushnotifier iodebug iopull = do + v <- liftAnnex getXMPPCreds + case v of + Nothing -> noop + Just c -> liftIO $ loop c =<< getCurrentTime + where + loop c starttime = do + void $ connectXMPP c $ \jid -> do + fulljid <- bindJID jid + liftIO $ iodebug ["XMPP connected", show fulljid] + putStanza $ gitAnnexPresence gitAnnexSignature + s <- getSession + _ <- liftIO $ forkIO $ void $ runXMPP s $ + receivenotifications + sendnotifications + now <- getCurrentTime + if diffUTCTime now starttime > 300 + then do + iodebug ["XMPP connection lost; reconnecting"] + loop c now + else do + iodebug ["XMPP connection failed; will retry"] + threadDelaySeconds (Seconds 300) + loop c =<< getCurrentTime + sendnotifications = forever $ do + us <- liftIO $ waitPush pushnotifier + putStanza $ gitAnnexPresence $ encodePushNotification us + receivenotifications = forever $ do + s <- getStanza + liftIO $ iodebug ["received XMPP:", show s] + case s of + ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> + liftIO $ iopull $ concat $ catMaybes $ + map decodePushNotification $ + presencePayloads p + _ -> noop {- We only pull from one remote out of the set listed in the push - notification, as an optimisation. @@ -89,18 +88,18 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $ - fully up-to-date. If that happens, the pushRetryThread will come along - and retry the push, and we'll get another notification once it succeeds, - and pull again. -} -pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO () -pull _ _ [] = noop -pull st dstatus us = do - rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus - brokendebug thisThread $ "push notification for" : - map (fromUUID . Remote.uuid ) rs - pullone rs =<< runThreadState st (inRepo Git.Branch.current) - where - matching r = Remote.uuid r `S.member` s - s = S.fromList us +pull :: [UUID] -> Assistant () +pull [] = noop +pull us = do + rs <- filter matching . syncRemotes <$> daemonStatus + debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs + st <- getAssistant threadState + liftIO . pullone st rs =<< liftAnnex (inRepo Git.Branch.current) + where + matching r = Remote.uuid r `S.member` s + s = S.fromList us - pullone [] _ = noop - pullone (r:rs) branch = - unlessM (all id . fst <$> manualPull st branch [r]) $ - pullone rs branch + pullone _ [] _ = noop + pullone st (r:rs) branch = + unlessM (all id . fst <$> manualPull st branch [r]) $ + pullone st rs branch diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index a15c0e152..811314651 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -11,7 +11,6 @@ import Assistant.Common import Assistant.Commits import Assistant.Pushes import Assistant.Alert -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Sync import Utility.ThreadScheduler @@ -24,52 +23,49 @@ thisThread :: ThreadName thisThread = "Pusher" {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread -pushRetryThread st dstatus pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds halfhour) $ do +pushRetryThread :: NamedThread +pushRetryThread = NamedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do -- We already waited half an hour, now wait until there are failed -- pushes to retry. - topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) + pushmap <- getAssistant failedPushMap + topush <- liftIO $ getFailedPushesBefore pushmap (fromIntegral halfhour) unless (null topush) $ do - brokendebug thisThread - [ "retrying" - , show (length topush) - , "failed pushes" - ] - now <- getCurrentTime - void $ alertWhile dstatus (pushRetryAlert topush) $ + debug ["retrying", show (length topush), "failed pushes"] + now <- liftIO $ getCurrentTime + st <- getAssistant threadState + pushnotifier <- getAssistant pushNotifier + dstatus <- getAssistant daemonStatusHandle + void $ liftIO $ alertWhile dstatus (pushRetryAlert topush) $ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush where halfhour = 1800 - thread = NamedThread thisThread {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread -pushThread st dstatus commitchan pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds 2) $ do +pushThread :: NamedThread +pushThread = NamedThread "Pusher" $ runEvery (Seconds 2) <~> do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made - commits <- getCommits commitchan + commits <- getCommits <<~ commitChan -- Now see if now's a good time to push. if shouldPush commits then do - remotes <- filter pushable . syncRemotes - <$> getDaemonStatus dstatus + remotes <- filter pushable . syncRemotes <$> daemonStatus unless (null remotes) $ do - now <- getCurrentTime - void $ alertWhile dstatus (pushAlert remotes) $ + now <- liftIO $ getCurrentTime + st <- getAssistant threadState + pushmap <- getAssistant failedPushMap + pushnotifier <- getAssistant pushNotifier + dstatus <- getAssistant daemonStatusHandle + void $ liftIO $ alertWhile dstatus (pushAlert remotes) $ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do - brokendebug thisThread - [ "delaying push of" - , show (length commits) - , "commits" - ] - refillCommits commitchan commits - where - thread = NamedThread thisThread - pushable r - | Remote.specialRemote r = False - | Remote.readonly r = False - | otherwise = True + debug ["delaying push of", show (length commits), "commits"] + flip refillCommits commits <<~ commitChan + where + pushable r + | Remote.specialRemote r = False + | Remote.readonly r = False + | otherwise = True {- Decide if now is a good time to push to remotes. - diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index 28df518aa..8c46a79fa 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -10,7 +10,6 @@ module Assistant.Threads.TransferScanner where import Assistant.Common import Assistant.ScanRemotes import Assistant.TransferQueue -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Alert import Assistant.Drop @@ -27,64 +26,64 @@ import Annex.Wanted import qualified Data.Set as S -thisThread :: ThreadName -thisThread = "TransferScanner" - {- This thread waits until a remote needs to be scanned, to find transfers - that need to be made, to keep data in sync. -} -transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread -transferScannerThread st dstatus scanremotes transferqueue = thread $ liftIO $ do +transferScannerThread :: NamedThread +transferScannerThread = NamedThread "TransferScanner" $ do startupScan go S.empty - where - thread = NamedThread thisThread - go scanned = do - threadDelaySeconds (Seconds 2) - (rs, infos) <- unzip <$> getScanRemote scanremotes - if any fullScan infos || any (`S.notMember` scanned) rs - then do - expensiveScan st dstatus transferqueue rs - go $ scanned `S.union` S.fromList rs - else do - mapM_ (failedTransferScan st dstatus transferqueue) rs - go scanned - {- All available remotes are scanned in full on startup, - - for multiple reasons, including: - - - - * This may be the first run, and there may be remotes - - already in place, that need to be synced. - - * We may have run before, and scanned a remote, but - - only been in a subdirectory of the git remote, and so - - not synced it all. - - * We may have run before, and had transfers queued, - - and then the system (or us) crashed, and that info was - - lost. - -} - startupScan = addScanRemotes scanremotes True - =<< syncRemotes <$> getDaemonStatus dstatus + where + go scanned = do + liftIO $ threadDelaySeconds (Seconds 2) + (rs, infos) <- unzip <$> getScanRemote <<~ scanRemoteMap + if any fullScan infos || any (`S.notMember` scanned) rs + then do + expensiveScan rs + go $ scanned `S.union` S.fromList rs + else do + mapM_ failedTransferScan rs + go scanned + {- All available remotes are scanned in full on startup, + - for multiple reasons, including: + - + - * This may be the first run, and there may be remotes + - already in place, that need to be synced. + - * We may have run before, and scanned a remote, but + - only been in a subdirectory of the git remote, and so + - not synced it all. + - * We may have run before, and had transfers queued, + - and then the system (or us) crashed, and that info was + - lost. + -} + startupScan = do + scanremotes <- getAssistant scanRemoteMap + liftIO . addScanRemotes scanremotes True + =<< syncRemotes <$> daemonStatus {- This is a cheap scan for failed transfers involving a remote. -} -failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO () -failedTransferScan st dstatus transferqueue r = do - failed <- runThreadState st $ getFailedTransfers (Remote.uuid r) - runThreadState st $ mapM_ removeFailedTransfer $ map fst failed +failedTransferScan :: Remote -> Assistant () +failedTransferScan r = do + failed <- liftAnnex $ getFailedTransfers (Remote.uuid r) + liftAnnex $ mapM_ removeFailedTransfer $ map fst failed mapM_ retry failed - where - retry (t, info) - | transferDirection t == Download = do - {- Check if the remote still has the key. - - If not, relies on the expensiveScan to - - get it queued from some other remote. -} - whenM (runThreadState st $ remoteHas r $ transferKey t) $ - requeue t info - | otherwise = do - {- The Transferrer checks when uploading - - that the remote doesn't already have the - - key, so it's not redundantly checked - - here. -} + where + retry (t, info) + | transferDirection t == Download = do + {- Check if the remote still has the key. + - If not, relies on the expensiveScan to + - get it queued from some other remote. -} + whenM (liftAnnex $ remoteHas r $ transferKey t) $ requeue t info - requeue t info = queueTransferWhenSmall + | otherwise = do + {- The Transferrer checks when uploading + - that the remote doesn't already have the + - key, so it's not redundantly checked here. -} + requeue t info + requeue t info = do + transferqueue <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ queueTransferWhenSmall transferqueue dstatus (associatedFile info) t r {- This is a expensive scan through the full git work tree, finding @@ -98,42 +97,45 @@ failedTransferScan st dstatus transferqueue r = do - TODO: It would be better to first drop as much as we can, before - transferring much, to minimise disk use. -} -expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO () -expensiveScan st dstatus transferqueue rs = unless onlyweb $ do - brokendebug thisThread ["starting scan of", show visiblers] - void $ alertWhile dstatus (scanAlert visiblers) $ do - g <- runThreadState st gitRepo - (files, cleanup) <- LsFiles.inRepo [] g +expensiveScan :: [Remote] -> Assistant () +expensiveScan rs = unless onlyweb $ do + debug ["starting scan of", show visiblers] + dstatus <- getAssistant daemonStatusHandle + void $ alertWhile dstatus (scanAlert visiblers) <~> do + g <- liftAnnex gitRepo + (files, cleanup) <- liftIO $ LsFiles.inRepo [] g forM_ files $ \f -> do - ts <- runThreadState st $ - ifAnnexed f (findtransfers f) (return []) + ts <- liftAnnex $ + ifAnnexed f (findtransfers dstatus f) (return []) mapM_ (enqueue f) ts - void cleanup + void $ liftIO cleanup return True - brokendebug thisThread ["finished scan of", show visiblers] - where - onlyweb = all (== webUUID) $ map Remote.uuid rs - visiblers = let rs' = filter (not . Remote.readonly) rs - in if null rs' then rs else rs' - enqueue f (r, t) = do - brokendebug thisThread ["queuing", show t] - queueTransferWhenSmall transferqueue dstatus (Just f) t r - findtransfers f (key, _) = do - locs <- loggedLocations key - {- The syncable remotes may have changed since this - - scan began. -} - syncrs <- liftIO $ syncRemotes <$> getDaemonStatus dstatus - present <- inAnnex key + debug ["finished scan of", show visiblers] + where + onlyweb = all (== webUUID) $ map Remote.uuid rs + visiblers = let rs' = filter (not . Remote.readonly) rs + in if null rs' then rs else rs' + enqueue f (r, t) = do + debug ["queuing", show t] + transferqueue <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ queueTransferWhenSmall transferqueue dstatus (Just f) t r + findtransfers dstatus f (key, _) = do + locs <- loggedLocations key + {- The syncable remotes may have changed since this + - scan began. -} + syncrs <- liftIO $ syncRemotes <$> getDaemonStatus dstatus + present <- inAnnex key - handleDrops' locs syncrs present key (Just f) + handleDrops' locs syncrs present key (Just f) - let slocs = S.fromList locs - let use a = return $ catMaybes $ map (a key slocs) syncrs - if present - then filterM (wantSend (Just f) . Remote.uuid . fst) - =<< use (genTransfer Upload False) - else ifM (wantGet $ Just f) - ( use (genTransfer Download True) , return [] ) + let slocs = S.fromList locs + let use a = return $ catMaybes $ map (a key slocs) syncrs + if present + then filterM (wantSend (Just f) . Remote.uuid . fst) + =<< use (genTransfer Upload False) + else ifM (wantGet $ Just f) + ( use (genTransfer Download True) , return [] ) genTransfer :: Direction -> Bool -> Key -> S.Set UUID -> Remote -> Maybe (Remote, Transfer) genTransfer direction want key slocs r diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 33d4196da..7287c8c12 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -159,10 +159,10 @@ onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file) ensurestaged link daemonstatus | scanComplete daemonstatus = addlink link | otherwise = case filestatus of - Just s | changedrecently s -> liftIO noChange + Just s | not (changedrecently s) -> liftIO noChange _ -> addlink link where - changedrecently s = not $ + changedrecently s = afterLastDaemonRun (statusChangeTime s) daemonstatus {- For speed, tries to reuse the existing blob for symlink target. -} |