aboutsummaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-10-29 11:40:22 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-10-29 11:40:22 -0400
commitf901112e1ce30f43dc7294e0bd0616bb02556500 (patch)
tree92ab6d6f220ea21e0cc7feeff6caca52d4d2b677 /Assistant
parent710dfa7e3ec897d6f02930540b10bb303e3a9c91 (diff)
converted 6 more threads
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Threads/Committer.hs215
-rw-r--r--Assistant/Threads/ConfigMonitor.hs72
-rw-r--r--Assistant/Threads/Merger.hs72
-rw-r--r--Assistant/Threads/PushNotifier.hs119
-rw-r--r--Assistant/Threads/Pusher.hs58
-rw-r--r--Assistant/Threads/TransferScanner.hs164
-rw-r--r--Assistant/Threads/Watcher.hs4
7 files changed, 343 insertions, 361 deletions
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index 11c6dc584..2ab693f05 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -13,7 +13,6 @@ import Assistant.Common
import Assistant.Changes
import Assistant.Commits
import Assistant.Alert
-import Assistant.ThreadedMonad
import Assistant.Threads.Watcher
import Assistant.TransferQueue
import Assistant.DaemonStatus
@@ -37,48 +36,40 @@ import Data.Tuple.Utils
import qualified Data.Set as S
import Data.Either
-thisThread :: ThreadName
-thisThread = "Committer"
-
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread
-commitThread st changechan commitchan transferqueue dstatus = thread $ liftIO $ do
- delayadd <- runThreadState st $
+commitThread :: NamedThread
+commitThread = NamedThread "Committer" $ do
+ delayadd <- liftAnnex $
maybe delayaddDefault (Just . Seconds) . readish
<$> getConfig (annexConfig "delayadd") ""
- runEvery (Seconds 1) $ do
+ runEvery (Seconds 1) <~> do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
- changes <- getChanges changechan
+ changes <- getChanges <<~ changeChan
-- Now see if now's a good time to commit.
- time <- getCurrentTime
+ time <- liftIO getCurrentTime
if shouldCommit time changes
then do
- readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes
+ readychanges <- handleAdds delayadd changes
if shouldCommit time readychanges
then do
- brokendebug thisThread
+ debug
[ "committing"
, show (length readychanges)
, "changes"
]
- void $ alertWhile dstatus commitAlert $
- runThreadState st commitStaged
- recordCommit commitchan
+ dstatus <- getAssistant daemonStatusHandle
+ void $ alertWhile dstatus commitAlert <~>
+ liftAnnex commitStaged
+ recordCommit <<~ commitChan
else refill readychanges
else refill changes
- where
- thread = NamedThread thisThread
- refill [] = noop
- refill cs = do
- brokendebug thisThread
- [ "delaying commit of"
- , show (length cs)
- , "changes"
- ]
- refillChanges changechan cs
-
+ where
+ refill [] = noop
+ refill cs = do
+ debug ["delaying commit of", show (length cs), "changes"]
+ flip refillChanges cs <<~ changeChan
commitStaged :: Annex Bool
commitStaged = do
@@ -99,12 +90,12 @@ commitStaged = do
- each other out, etc. Git returns nonzero on those,
- so don't propigate out commit failures. -}
return True
- where
- nomessage ps
- | Git.Version.older "1.7.2" = Param "-m"
- : Param "autocommit" : ps
- | otherwise = Param "--allow-empty-message"
- : Param "-m" : Param "" : ps
+ where
+ nomessage ps
+ | Git.Version.older "1.7.2" = Param "-m"
+ : Param "autocommit" : ps
+ | otherwise = Param "--allow-empty-message"
+ : Param "-m" : Param "" : ps
{- Decide if now is a good time to make a commit.
- Note that the list of change times has an undefined order.
@@ -118,9 +109,9 @@ shouldCommit now changes
| len > 10000 = True -- avoid bloating queue too much
| length (filter thisSecond changes) < 10 = True
| otherwise = False -- batch activity
- where
- len = length changes
- thisSecond c = now `diffUTCTime` changeTime c <= 1
+ where
+ len = length changes
+ thisSecond c = now `diffUTCTime` changeTime c <= 1
{- OSX needs a short delay after a file is added before locking it down,
- as pasting a file seems to try to set file permissions or otherwise
@@ -152,77 +143,82 @@ delayaddDefault = Nothing
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
-handleAdds :: Maybe Seconds -> ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
-handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null incomplete) $ do
+handleAdds :: Maybe Seconds -> [Change] -> Assistant [Change]
+handleAdds delayadd cs = returnWhen (null incomplete) $ do
let (pending, inprocess) = partition isPendingAddChange incomplete
pending' <- findnew pending
- (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd st pending' inprocess
+ (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd pending' inprocess
unless (null postponed) $
- refillChanges changechan postponed
+ flip refillChanges postponed <<~ changeChan
returnWhen (null toadd) $ do
added <- catMaybes <$> forM toadd add
if DirWatcher.eventsCoalesce || null added
then return $ added ++ otherchanges
else do
- r <- handleAdds delayadd st changechan transferqueue dstatus
- =<< getChanges changechan
+ r <- handleAdds delayadd
+ =<< getChanges <<~ changeChan
return $ r ++ added ++ otherchanges
- where
- (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs
+ where
+ (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs
- findnew [] = return []
- findnew pending@(exemplar:_) = do
- (!newfiles, cleanup) <- runThreadState st $
- inRepo (Git.LsFiles.notInRepo False $ map changeFile pending)
- void cleanup
- -- note: timestamp info is lost here
- let ts = changeTime exemplar
- return $ map (PendingAddChange ts) newfiles
+ findnew [] = return []
+ findnew pending@(exemplar:_) = do
+ (!newfiles, cleanup) <- liftAnnex $
+ inRepo (Git.LsFiles.notInRepo False $ map changeFile pending)
+ void $ liftIO cleanup
+ -- note: timestamp info is lost here
+ let ts = changeTime exemplar
+ return $ map (PendingAddChange ts) newfiles
+
+ returnWhen c a
+ | c = return otherchanges
+ | otherwise = a
- returnWhen c a
- | c = return otherchanges
- | otherwise = a
+ add :: Change -> Assistant (Maybe Change)
+ add change@(InProcessAddChange { keySource = ks }) = do
+ dstatus <- getAssistant daemonStatusHandle
+ alertWhile' dstatus (addFileAlert $ keyFilename ks) <~> add' change ks
+ add _ = return Nothing
- add :: Change -> IO (Maybe Change)
- add change@(InProcessAddChange { keySource = ks }) =
- alertWhile' dstatus (addFileAlert $ keyFilename ks) $
- liftM ret $ catchMaybeIO $
- sanitycheck ks $ runThreadState st $ do
- showStart "add" $ keyFilename ks
- key <- Command.Add.ingest ks
- done (finishedChange change) (keyFilename ks) key
- where
- {- Add errors tend to be transient and will
- - be automatically dealt with, so don't
- - pass to the alert code. -}
- ret (Just j@(Just _)) = (True, j)
- ret _ = (True, Nothing)
- add _ = return Nothing
+ add' change ks = liftM ret $ catchMaybeIO <~> do
+ sanitycheck ks $ do
+ key <- liftAnnex $ do
+ showStart "add" $ keyFilename ks
+ Command.Add.ingest ks
+ done (finishedChange change) (keyFilename ks) key
+ where
+ {- Add errors tend to be transient and will be automatically
+ - dealt with, so don't pass to the alert code. -}
+ ret (Just j@(Just _)) = (True, j)
+ ret _ = (True, Nothing)
- done _ _ Nothing = do
- showEndFail
- return Nothing
- done change file (Just key) = do
- link <- Command.Add.link file key True
- when DirWatcher.eventsCoalesce $ do
+ done _ _ Nothing = do
+ liftAnnex showEndFail
+ return Nothing
+ done change file (Just key) = do
+ link <- liftAnnex $ Command.Add.link file key True
+ when DirWatcher.eventsCoalesce $
+ liftAnnex $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
- queueTransfers Next transferqueue dstatus key (Just file) Upload
- showEndOk
- return $ Just change
+ showEndOk
+ transferqueue <- getAssistant transferQueue
+ dstatus <- getAssistant daemonStatusHandle
+ liftAnnex $ queueTransfers Next transferqueue dstatus key (Just file) Upload
+ return $ Just change
- {- Check that the keysource's keyFilename still exists,
- - and is still a hard link to its contentLocation,
- - before ingesting it. -}
- sanitycheck keysource a = do
- fs <- getSymbolicLinkStatus $ keyFilename keysource
- ks <- getSymbolicLinkStatus $ contentLocation keysource
- if deviceID ks == deviceID fs && fileID ks == fileID fs
- then a
- else return Nothing
+ {- Check that the keysource's keyFilename still exists,
+ - and is still a hard link to its contentLocation,
+ - before ingesting it. -}
+ sanitycheck keysource a = do
+ fs <- liftIO $ getSymbolicLinkStatus $ keyFilename keysource
+ ks <- liftIO $ getSymbolicLinkStatus $ contentLocation keysource
+ if deviceID ks == deviceID fs && fileID ks == fileID fs
+ then a
+ else return Nothing
{- Files can Either be Right to be added now,
- or are unsafe, and must be Left for later.
@@ -230,11 +226,11 @@ handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null in
- Check by running lsof on the temp directory, which
- the KeySources are locked down in.
-}
-safeToAdd :: Maybe Seconds -> ThreadState -> [Change] -> [Change] -> IO [Either Change Change]
-safeToAdd _ _ [] [] = return []
-safeToAdd delayadd st pending inprocess = do
- maybe noop threadDelaySeconds delayadd
- runThreadState st $ do
+safeToAdd :: Maybe Seconds -> [Change] -> [Change] -> Assistant [Either Change Change]
+safeToAdd _ [] [] = return []
+safeToAdd delayadd pending inprocess = do
+ maybe noop (liftIO . threadDelaySeconds) delayadd
+ liftAnnex $ do
keysources <- mapM Command.Add.lockDown (map changeFile pending)
let inprocess' = map mkinprocess (zip pending keysources)
tmpdir <- fromRepo gitAnnexTmpDir
@@ -250,25 +246,24 @@ safeToAdd delayadd st pending inprocess = do
mapM_ canceladd $ lefts checked
allRight $ rights checked
else return checked
- where
- check openfiles change@(InProcessAddChange { keySource = ks })
- | S.member (contentLocation ks) openfiles = Left change
- check _ change = Right change
+ where
+ check openfiles change@(InProcessAddChange { keySource = ks })
+ | S.member (contentLocation ks) openfiles = Left change
+ check _ change = Right change
- mkinprocess (c, ks) = InProcessAddChange
- { changeTime = changeTime c
- , keySource = ks
- }
+ mkinprocess (c, ks) = InProcessAddChange
+ { changeTime = changeTime c
+ , keySource = ks
+ }
- canceladd (InProcessAddChange { keySource = ks }) = do
- warning $ keyFilename ks
- ++ " still has writers, not adding"
- -- remove the hard link
- void $ liftIO $ tryIO $
- removeFile $ contentLocation ks
- canceladd _ = noop
+ canceladd (InProcessAddChange { keySource = ks }) = do
+ warning $ keyFilename ks
+ ++ " still has writers, not adding"
+ -- remove the hard link
+ void $ liftIO $ tryIO $ removeFile $ contentLocation ks
+ canceladd _ = noop
- openwrite (_file, mode, _pid) =
- mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
+ openwrite (_file, mode, _pid) =
+ mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
- allRight = return . map Right
+ allRight = return . map Right
diff --git a/Assistant/Threads/ConfigMonitor.hs b/Assistant/Threads/ConfigMonitor.hs
index 2d5df48dd..fe98b10e8 100644
--- a/Assistant/Threads/ConfigMonitor.hs
+++ b/Assistant/Threads/ConfigMonitor.hs
@@ -9,7 +9,6 @@ module Assistant.Threads.ConfigMonitor where
import Assistant.Common
import Assistant.BranchChange
-import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Commits
import Utility.ThreadScheduler
@@ -19,10 +18,8 @@ import Logs.Remote
import Logs.PreferredContent
import Logs.Group
import Remote.List (remoteListRefresh)
-import qualified Git
import qualified Git.LsTree as LsTree
import qualified Annex.Branch
-import qualified Annex
import qualified Data.Set as S
@@ -37,26 +34,22 @@ thisThread = "ConfigMonitor"
- if the branch has not changed in a while, configuration changes will
- be detected immediately.
-}
-configMonitorThread :: ThreadState -> DaemonStatusHandle -> BranchChangeHandle -> CommitChan -> NamedThread
-configMonitorThread st dstatus branchhandle commitchan = thread $ liftIO $ do
- r <- runThreadState st Annex.gitRepo
- go r =<< getConfigs r
- where
- thread = NamedThread thisThread
-
- go r old = do
- threadDelaySeconds (Seconds 60)
- waitBranchChange branchhandle
- new <- getConfigs r
- when (old /= new) $ do
- let changedconfigs = new `S.difference` old
- brokendebug thisThread $ "reloading config" :
- map fst (S.toList changedconfigs)
- reloadConfigs st dstatus changedconfigs
- {- Record a commit to get this config
- - change pushed out to remotes. -}
- recordCommit commitchan
- go r new
+configMonitorThread :: NamedThread
+configMonitorThread = NamedThread "ConfigMonitor" $ loop =<< getConfigs
+ where
+ loop old = do
+ liftIO $ threadDelaySeconds (Seconds 60)
+ waitBranchChange <<~ branchChangeHandle
+ new <- getConfigs
+ when (old /= new) $ do
+ let changedconfigs = new `S.difference` old
+ debug $ "reloading config" :
+ map fst (S.toList changedconfigs)
+ reloadConfigs new
+ {- Record a commit to get this config
+ - change pushed out to remotes. -}
+ recordCommit <<~ commitChan
+ loop new
{- Config files, and their checksums. -}
type Configs = S.Set (FilePath, String)
@@ -73,22 +66,23 @@ configFilesActions =
, (preferredContentLog, noop)
]
-reloadConfigs :: ThreadState -> DaemonStatusHandle -> Configs -> IO ()
-reloadConfigs st dstatus changedconfigs = runThreadState st $ do
- sequence_ as
- void preferredContentMapLoad
+reloadConfigs :: Configs -> Assistant ()
+reloadConfigs changedconfigs = do
+ liftAnnex $ do
+ sequence_ as
+ void preferredContentMapLoad
{- Changes to the remote log, or the trust log, can affect the
- syncRemotes list -}
- when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $
- updateSyncRemotes dstatus
- where
- (fs, as) = unzip $ filter (flip S.member changedfiles . fst)
- configFilesActions
- changedfiles = S.map fst changedconfigs
+ when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $
+ liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle
+ where
+ (fs, as) = unzip $ filter (flip S.member changedfiles . fst)
+ configFilesActions
+ changedfiles = S.map fst changedconfigs
-getConfigs :: Git.Repo -> IO Configs
-getConfigs r = S.fromList . map extract
- <$> LsTree.lsTreeFiles Annex.Branch.fullname files r
- where
- files = map fst configFilesActions
- extract treeitem = (LsTree.file treeitem, LsTree.sha treeitem)
+getConfigs :: Assistant Configs
+getConfigs = S.fromList . map extract
+ <$> liftAnnex (inRepo $ LsTree.lsTreeFiles Annex.Branch.fullname files)
+ where
+ files = map fst configFilesActions
+ extract treeitem = (LsTree.file treeitem, LsTree.sha treeitem)
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
index 152b40361..a766c5977 100644
--- a/Assistant/Threads/Merger.hs
+++ b/Assistant/Threads/Merger.hs
@@ -8,8 +8,6 @@
module Assistant.Threads.Merger where
import Assistant.Common
-import Assistant.ThreadedMonad
-import Assistant.DaemonStatus
import Assistant.TransferQueue
import Assistant.BranchChange
import Utility.DirWatcher
@@ -24,36 +22,34 @@ thisThread = "Merger"
{- This thread watches for changes to .git/refs/, and handles incoming
- pushes. -}
-mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> NamedThread
-mergeThread st dstatus transferqueue branchchange = thread $ liftIO $ do
- g <- runThreadState st gitRepo
+mergeThread :: NamedThread
+mergeThread = NamedThread "Merger" $ do
+ g <- liftAnnex gitRepo
let dir = Git.localGitDir g </> "refs"
- createDirectoryIfMissing True dir
- let hook a = Just $ runHandler st dstatus transferqueue branchchange a
+ liftIO $ createDirectoryIfMissing True dir
+ let hook a = Just <$> asIO2 (runHandler a)
+ addhook <- hook onAdd
+ errhook <- hook onErr
let hooks = mkWatchHooks
- { addHook = hook onAdd
- , errHook = hook onErr
+ { addHook = addhook
+ , errHook = errhook
}
- void $ watchDir dir (const False) hooks id
- brokendebug thisThread ["watching", dir]
- where
- thread = NamedThread thisThread
+ void $ liftIO $ watchDir dir (const False) hooks id
+ debug ["watching", dir]
-type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> FilePath -> Maybe FileStatus -> IO ()
+type Handler = FilePath -> Assistant ()
{- Runs an action handler.
-
- Exceptions are ignored, otherwise a whole thread could be crashed.
-}
-runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> Handler -> FilePath -> Maybe FileStatus -> IO ()
-runHandler st dstatus transferqueue branchchange handler file filestatus = void $
- either print (const noop) =<< tryIO go
- where
- go = handler st dstatus transferqueue branchchange file filestatus
+runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant ()
+runHandler handler file _filestatus =
+ either (liftIO . print) (const noop) =<< tryIO <~> handler file
{- Called when there's an error with inotify. -}
onErr :: Handler
-onErr _ _ _ _ msg _ = error msg
+onErr msg = error msg
{- Called when a new branch ref is written.
-
@@ -67,29 +63,29 @@ onErr _ _ _ _ msg _ = error msg
- ran are merged in.
-}
onAdd :: Handler
-onAdd st dstatus transferqueue branchchange file _
+onAdd file
| ".lock" `isSuffixOf` file = noop
| isAnnexBranch file = do
- branchChanged branchchange
- runThreadState st $
+ branchChanged <<~ branchChangeHandle
+ transferqueue <- getAssistant transferQueue
+ dstatus <- getAssistant daemonStatusHandle
+ liftAnnex $
whenM Annex.Branch.forceUpdate $
queueDeferredDownloads Later transferqueue dstatus
- | "/synced/" `isInfixOf` file = runThreadState st $ do
- mergecurrent =<< inRepo Git.Branch.current
+ | "/synced/" `isInfixOf` file = do
+ mergecurrent =<< liftAnnex (inRepo Git.Branch.current)
| otherwise = noop
- where
- changedbranch = fileToBranch file
- mergecurrent (Just current)
- | equivBranches changedbranch current = do
- liftIO $ brokendebug thisThread
- [ "merging"
- , show changedbranch
- , "into"
- , show current
- ]
- void $ inRepo $
- Git.Merge.mergeNonInteractive changedbranch
- mergecurrent _ = noop
+ where
+ changedbranch = fileToBranch file
+ mergecurrent (Just current)
+ | equivBranches changedbranch current = do
+ debug
+ [ "merging", show changedbranch
+ , "into", show current
+ ]
+ void $ liftAnnex $ inRepo $
+ Git.Merge.mergeNonInteractive changedbranch
+ mergecurrent _ = noop
equivBranches :: Git.Ref -> Git.Ref -> Bool
equivBranches x y = base x == base y
diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs
index 591c8b18b..d19369b8d 100644
--- a/Assistant/Threads/PushNotifier.hs
+++ b/Assistant/Threads/PushNotifier.hs
@@ -12,7 +12,6 @@ module Assistant.Threads.PushNotifier where
import Assistant.Common
import Assistant.XMPP
-import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Pushes
import Assistant.Sync
@@ -25,56 +24,56 @@ import qualified Data.Set as S
import qualified Git.Branch
import Data.Time.Clock
-thisThread :: ThreadName
-thisThread = "PushNotifier"
+pushNotifierThread :: NamedThread
+pushNotifierThread = NamedThread "PushNotifier" $ do
+ iodebug <- asIO debug
+ iopull <- asIO pull
+ pn <- getAssistant pushNotifier
+ controllerThread pn <~> xmppClient pn iodebug iopull
controllerThread :: PushNotifier -> IO () -> IO ()
-controllerThread pushnotifier a = forever $ do
- tid <- forkIO a
+controllerThread pushnotifier xmppclient = forever $ do
+ tid <- forkIO xmppclient
waitRestart pushnotifier
killThread tid
-pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
-pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $
- controllerThread pushnotifier $ do
- v <- runThreadState st $ getXMPPCreds
- case v of
- Nothing -> noop
- Just c -> loop c =<< getCurrentTime
- where
- loop c starttime = do
- void $ connectXMPP c $ \jid -> do
- fulljid <- bindJID jid
- liftIO $ brokendebug thisThread ["XMPP connected", show fulljid]
- putStanza $ gitAnnexPresence gitAnnexSignature
- s <- getSession
- _ <- liftIO $ forkIO $ void $ runXMPP s $
- receivenotifications
- sendnotifications
- now <- getCurrentTime
- if diffUTCTime now starttime > 300
- then do
- brokendebug thisThread ["XMPP connection lost; reconnecting"]
- loop c now
- else do
- brokendebug thisThread ["XMPP connection failed; will retry"]
- threadDelaySeconds (Seconds 300)
- loop c =<< getCurrentTime
-
- sendnotifications = forever $ do
- us <- liftIO $ waitPush pushnotifier
- putStanza $ gitAnnexPresence $ encodePushNotification us
-
- receivenotifications = forever $ do
- s <- getStanza
- liftIO $ brokendebug thisThread ["received XMPP:", show s]
- case s of
- ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) ->
- liftIO $ pull st dstatus $
- concat $ catMaybes $
- map decodePushNotification $
- presencePayloads p
- _ -> noop
+xmppClient :: PushNotifier -> ([String] -> IO ()) -> ([UUID] -> IO ()) -> Assistant ()
+xmppClient pushnotifier iodebug iopull = do
+ v <- liftAnnex getXMPPCreds
+ case v of
+ Nothing -> noop
+ Just c -> liftIO $ loop c =<< getCurrentTime
+ where
+ loop c starttime = do
+ void $ connectXMPP c $ \jid -> do
+ fulljid <- bindJID jid
+ liftIO $ iodebug ["XMPP connected", show fulljid]
+ putStanza $ gitAnnexPresence gitAnnexSignature
+ s <- getSession
+ _ <- liftIO $ forkIO $ void $ runXMPP s $
+ receivenotifications
+ sendnotifications
+ now <- getCurrentTime
+ if diffUTCTime now starttime > 300
+ then do
+ iodebug ["XMPP connection lost; reconnecting"]
+ loop c now
+ else do
+ iodebug ["XMPP connection failed; will retry"]
+ threadDelaySeconds (Seconds 300)
+ loop c =<< getCurrentTime
+ sendnotifications = forever $ do
+ us <- liftIO $ waitPush pushnotifier
+ putStanza $ gitAnnexPresence $ encodePushNotification us
+ receivenotifications = forever $ do
+ s <- getStanza
+ liftIO $ iodebug ["received XMPP:", show s]
+ case s of
+ ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) ->
+ liftIO $ iopull $ concat $ catMaybes $
+ map decodePushNotification $
+ presencePayloads p
+ _ -> noop
{- We only pull from one remote out of the set listed in the push
- notification, as an optimisation.
@@ -89,18 +88,18 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $
- fully up-to-date. If that happens, the pushRetryThread will come along
- and retry the push, and we'll get another notification once it succeeds,
- and pull again. -}
-pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO ()
-pull _ _ [] = noop
-pull st dstatus us = do
- rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus
- brokendebug thisThread $ "push notification for" :
- map (fromUUID . Remote.uuid ) rs
- pullone rs =<< runThreadState st (inRepo Git.Branch.current)
- where
- matching r = Remote.uuid r `S.member` s
- s = S.fromList us
+pull :: [UUID] -> Assistant ()
+pull [] = noop
+pull us = do
+ rs <- filter matching . syncRemotes <$> daemonStatus
+ debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
+ st <- getAssistant threadState
+ liftIO . pullone st rs =<< liftAnnex (inRepo Git.Branch.current)
+ where
+ matching r = Remote.uuid r `S.member` s
+ s = S.fromList us
- pullone [] _ = noop
- pullone (r:rs) branch =
- unlessM (all id . fst <$> manualPull st branch [r]) $
- pullone rs branch
+ pullone _ [] _ = noop
+ pullone st (r:rs) branch =
+ unlessM (all id . fst <$> manualPull st branch [r]) $
+ pullone st rs branch
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index a15c0e152..811314651 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -11,7 +11,6 @@ import Assistant.Common
import Assistant.Commits
import Assistant.Pushes
import Assistant.Alert
-import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Sync
import Utility.ThreadScheduler
@@ -24,52 +23,49 @@ thisThread :: ThreadName
thisThread = "Pusher"
{- This thread retries pushes that failed before. -}
-pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread
-pushRetryThread st dstatus pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds halfhour) $ do
+pushRetryThread :: NamedThread
+pushRetryThread = NamedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
- topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
+ pushmap <- getAssistant failedPushMap
+ topush <- liftIO $ getFailedPushesBefore pushmap (fromIntegral halfhour)
unless (null topush) $ do
- brokendebug thisThread
- [ "retrying"
- , show (length topush)
- , "failed pushes"
- ]
- now <- getCurrentTime
- void $ alertWhile dstatus (pushRetryAlert topush) $
+ debug ["retrying", show (length topush), "failed pushes"]
+ now <- liftIO $ getCurrentTime
+ st <- getAssistant threadState
+ pushnotifier <- getAssistant pushNotifier
+ dstatus <- getAssistant daemonStatusHandle
+ void $ liftIO $ alertWhile dstatus (pushRetryAlert topush) $
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush
where
halfhour = 1800
- thread = NamedThread thisThread
{- This thread pushes git commits out to remotes soon after they are made. -}
-pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread
-pushThread st dstatus commitchan pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds 2) $ do
+pushThread :: NamedThread
+pushThread = NamedThread "Pusher" $ runEvery (Seconds 2) <~> do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
- commits <- getCommits commitchan
+ commits <- getCommits <<~ commitChan
-- Now see if now's a good time to push.
if shouldPush commits
then do
- remotes <- filter pushable . syncRemotes
- <$> getDaemonStatus dstatus
+ remotes <- filter pushable . syncRemotes <$> daemonStatus
unless (null remotes) $ do
- now <- getCurrentTime
- void $ alertWhile dstatus (pushAlert remotes) $
+ now <- liftIO $ getCurrentTime
+ st <- getAssistant threadState
+ pushmap <- getAssistant failedPushMap
+ pushnotifier <- getAssistant pushNotifier
+ dstatus <- getAssistant daemonStatusHandle
+ void $ liftIO $ alertWhile dstatus (pushAlert remotes) $
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
else do
- brokendebug thisThread
- [ "delaying push of"
- , show (length commits)
- , "commits"
- ]
- refillCommits commitchan commits
- where
- thread = NamedThread thisThread
- pushable r
- | Remote.specialRemote r = False
- | Remote.readonly r = False
- | otherwise = True
+ debug ["delaying push of", show (length commits), "commits"]
+ flip refillCommits commits <<~ commitChan
+ where
+ pushable r
+ | Remote.specialRemote r = False
+ | Remote.readonly r = False
+ | otherwise = True
{- Decide if now is a good time to push to remotes.
-
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index 28df518aa..8c46a79fa 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -10,7 +10,6 @@ module Assistant.Threads.TransferScanner where
import Assistant.Common
import Assistant.ScanRemotes
import Assistant.TransferQueue
-import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Alert
import Assistant.Drop
@@ -27,64 +26,64 @@ import Annex.Wanted
import qualified Data.Set as S
-thisThread :: ThreadName
-thisThread = "TransferScanner"
-
{- This thread waits until a remote needs to be scanned, to find transfers
- that need to be made, to keep data in sync.
-}
-transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread
-transferScannerThread st dstatus scanremotes transferqueue = thread $ liftIO $ do
+transferScannerThread :: NamedThread
+transferScannerThread = NamedThread "TransferScanner" $ do
startupScan
go S.empty
- where
- thread = NamedThread thisThread
- go scanned = do
- threadDelaySeconds (Seconds 2)
- (rs, infos) <- unzip <$> getScanRemote scanremotes
- if any fullScan infos || any (`S.notMember` scanned) rs
- then do
- expensiveScan st dstatus transferqueue rs
- go $ scanned `S.union` S.fromList rs
- else do
- mapM_ (failedTransferScan st dstatus transferqueue) rs
- go scanned
- {- All available remotes are scanned in full on startup,
- - for multiple reasons, including:
- -
- - * This may be the first run, and there may be remotes
- - already in place, that need to be synced.
- - * We may have run before, and scanned a remote, but
- - only been in a subdirectory of the git remote, and so
- - not synced it all.
- - * We may have run before, and had transfers queued,
- - and then the system (or us) crashed, and that info was
- - lost.
- -}
- startupScan = addScanRemotes scanremotes True
- =<< syncRemotes <$> getDaemonStatus dstatus
+ where
+ go scanned = do
+ liftIO $ threadDelaySeconds (Seconds 2)
+ (rs, infos) <- unzip <$> getScanRemote <<~ scanRemoteMap
+ if any fullScan infos || any (`S.notMember` scanned) rs
+ then do
+ expensiveScan rs
+ go $ scanned `S.union` S.fromList rs
+ else do
+ mapM_ failedTransferScan rs
+ go scanned
+ {- All available remotes are scanned in full on startup,
+ - for multiple reasons, including:
+ -
+ - * This may be the first run, and there may be remotes
+ - already in place, that need to be synced.
+ - * We may have run before, and scanned a remote, but
+ - only been in a subdirectory of the git remote, and so
+ - not synced it all.
+ - * We may have run before, and had transfers queued,
+ - and then the system (or us) crashed, and that info was
+ - lost.
+ -}
+ startupScan = do
+ scanremotes <- getAssistant scanRemoteMap
+ liftIO . addScanRemotes scanremotes True
+ =<< syncRemotes <$> daemonStatus
{- This is a cheap scan for failed transfers involving a remote. -}
-failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
-failedTransferScan st dstatus transferqueue r = do
- failed <- runThreadState st $ getFailedTransfers (Remote.uuid r)
- runThreadState st $ mapM_ removeFailedTransfer $ map fst failed
+failedTransferScan :: Remote -> Assistant ()
+failedTransferScan r = do
+ failed <- liftAnnex $ getFailedTransfers (Remote.uuid r)
+ liftAnnex $ mapM_ removeFailedTransfer $ map fst failed
mapM_ retry failed
- where
- retry (t, info)
- | transferDirection t == Download = do
- {- Check if the remote still has the key.
- - If not, relies on the expensiveScan to
- - get it queued from some other remote. -}
- whenM (runThreadState st $ remoteHas r $ transferKey t) $
- requeue t info
- | otherwise = do
- {- The Transferrer checks when uploading
- - that the remote doesn't already have the
- - key, so it's not redundantly checked
- - here. -}
+ where
+ retry (t, info)
+ | transferDirection t == Download = do
+ {- Check if the remote still has the key.
+ - If not, relies on the expensiveScan to
+ - get it queued from some other remote. -}
+ whenM (liftAnnex $ remoteHas r $ transferKey t) $
requeue t info
- requeue t info = queueTransferWhenSmall
+ | otherwise = do
+ {- The Transferrer checks when uploading
+ - that the remote doesn't already have the
+ - key, so it's not redundantly checked here. -}
+ requeue t info
+ requeue t info = do
+ transferqueue <- getAssistant transferQueue
+ dstatus <- getAssistant daemonStatusHandle
+ liftIO $ queueTransferWhenSmall
transferqueue dstatus (associatedFile info) t r
{- This is a expensive scan through the full git work tree, finding
@@ -98,42 +97,45 @@ failedTransferScan st dstatus transferqueue r = do
- TODO: It would be better to first drop as much as we can, before
- transferring much, to minimise disk use.
-}
-expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO ()
-expensiveScan st dstatus transferqueue rs = unless onlyweb $ do
- brokendebug thisThread ["starting scan of", show visiblers]
- void $ alertWhile dstatus (scanAlert visiblers) $ do
- g <- runThreadState st gitRepo
- (files, cleanup) <- LsFiles.inRepo [] g
+expensiveScan :: [Remote] -> Assistant ()
+expensiveScan rs = unless onlyweb $ do
+ debug ["starting scan of", show visiblers]
+ dstatus <- getAssistant daemonStatusHandle
+ void $ alertWhile dstatus (scanAlert visiblers) <~> do
+ g <- liftAnnex gitRepo
+ (files, cleanup) <- liftIO $ LsFiles.inRepo [] g
forM_ files $ \f -> do
- ts <- runThreadState st $
- ifAnnexed f (findtransfers f) (return [])
+ ts <- liftAnnex $
+ ifAnnexed f (findtransfers dstatus f) (return [])
mapM_ (enqueue f) ts
- void cleanup
+ void $ liftIO cleanup
return True
- brokendebug thisThread ["finished scan of", show visiblers]
- where
- onlyweb = all (== webUUID) $ map Remote.uuid rs
- visiblers = let rs' = filter (not . Remote.readonly) rs
- in if null rs' then rs else rs'
- enqueue f (r, t) = do
- brokendebug thisThread ["queuing", show t]
- queueTransferWhenSmall transferqueue dstatus (Just f) t r
- findtransfers f (key, _) = do
- locs <- loggedLocations key
- {- The syncable remotes may have changed since this
- - scan began. -}
- syncrs <- liftIO $ syncRemotes <$> getDaemonStatus dstatus
- present <- inAnnex key
+ debug ["finished scan of", show visiblers]
+ where
+ onlyweb = all (== webUUID) $ map Remote.uuid rs
+ visiblers = let rs' = filter (not . Remote.readonly) rs
+ in if null rs' then rs else rs'
+ enqueue f (r, t) = do
+ debug ["queuing", show t]
+ transferqueue <- getAssistant transferQueue
+ dstatus <- getAssistant daemonStatusHandle
+ liftIO $ queueTransferWhenSmall transferqueue dstatus (Just f) t r
+ findtransfers dstatus f (key, _) = do
+ locs <- loggedLocations key
+ {- The syncable remotes may have changed since this
+ - scan began. -}
+ syncrs <- liftIO $ syncRemotes <$> getDaemonStatus dstatus
+ present <- inAnnex key
- handleDrops' locs syncrs present key (Just f)
+ handleDrops' locs syncrs present key (Just f)
- let slocs = S.fromList locs
- let use a = return $ catMaybes $ map (a key slocs) syncrs
- if present
- then filterM (wantSend (Just f) . Remote.uuid . fst)
- =<< use (genTransfer Upload False)
- else ifM (wantGet $ Just f)
- ( use (genTransfer Download True) , return [] )
+ let slocs = S.fromList locs
+ let use a = return $ catMaybes $ map (a key slocs) syncrs
+ if present
+ then filterM (wantSend (Just f) . Remote.uuid . fst)
+ =<< use (genTransfer Upload False)
+ else ifM (wantGet $ Just f)
+ ( use (genTransfer Download True) , return [] )
genTransfer :: Direction -> Bool -> Key -> S.Set UUID -> Remote -> Maybe (Remote, Transfer)
genTransfer direction want key slocs r
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index 33d4196da..7287c8c12 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -159,10 +159,10 @@ onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file)
ensurestaged link daemonstatus
| scanComplete daemonstatus = addlink link
| otherwise = case filestatus of
- Just s | changedrecently s -> liftIO noChange
+ Just s | not (changedrecently s) -> liftIO noChange
_ -> addlink link
where
- changedrecently s = not $
+ changedrecently s =
afterLastDaemonRun (statusChangeTime s) daemonstatus
{- For speed, tries to reuse the existing blob for symlink target. -}