summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-10-29 02:21:04 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-10-29 02:21:04 -0400
commit579f63b6b756ca51b8f9fe53c3e668500718d91f (patch)
tree20039581df67e034ef434749d37de41e9802d21d /Assistant
parent040f68d628120e112e22bfb7100f9650dec940c8 (diff)
Assistant monad, stage 2.5
Converted several threads to run in the monad. Added a lot of useful combinators for working with the monad. Now the monad includes the name of the thread. Some debugging messages are disabled pending converting other threads.
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Common.hs44
-rw-r--r--Assistant/DaemonStatus.hs4
-rw-r--r--Assistant/Monad.hs40
-rw-r--r--Assistant/Sync.hs6
-rw-r--r--Assistant/Threads/Committer.hs6
-rw-r--r--Assistant/Threads/ConfigMonitor.hs4
-rw-r--r--Assistant/Threads/DaemonStatus.hs27
-rw-r--r--Assistant/Threads/Merger.hs6
-rw-r--r--Assistant/Threads/MountWatcher.hs8
-rw-r--r--Assistant/Threads/NetWatcher.hs132
-rw-r--r--Assistant/Threads/PairListener.hs2
-rw-r--r--Assistant/Threads/PushNotifier.hs12
-rw-r--r--Assistant/Threads/Pusher.hs8
-rw-r--r--Assistant/Threads/SanityChecker.hs114
-rw-r--r--Assistant/Threads/TransferPoller.hs73
-rw-r--r--Assistant/Threads/TransferScanner.hs8
-rw-r--r--Assistant/Threads/TransferWatcher.hs13
-rw-r--r--Assistant/Threads/Transferrer.hs6
-rw-r--r--Assistant/Threads/Watcher.hs12
-rw-r--r--Assistant/Threads/WebApp.hs4
20 files changed, 273 insertions, 256 deletions
diff --git a/Assistant/Common.hs b/Assistant/Common.hs
index a6c6b8935..b46d3342a 100644
--- a/Assistant/Common.hs
+++ b/Assistant/Common.hs
@@ -10,7 +10,8 @@ module Assistant.Common (
ThreadName,
NamedThread(..),
runNamedThread,
- debug
+ debug,
+ brokendebug
) where
import Common.Annex as X
@@ -22,25 +23,28 @@ import System.Log.Logger
import qualified Control.Exception as E
type ThreadName = String
-data NamedThread = NamedThread ThreadName (IO ())
+data NamedThread = NamedThread ThreadName (Assistant ())
-debug :: ThreadName -> [String] -> IO ()
-debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws
+brokendebug :: ThreadName -> [String] -> IO ()
+brokendebug _ _ = noop -- TODO remove this
+
+debug :: [String] -> Assistant ()
+debug ws = do
+ name <- getAssistant threadName
+ liftIO $ debugM name $ unwords $ (name ++ ":") : ws
runNamedThread :: NamedThread -> Assistant ()
-runNamedThread (NamedThread name a) = liftIO . go =<< getAssistant daemonStatus
- where
- go dstatus = do
- r <- E.try a :: IO (Either E.SomeException ())
- case r of
- Right _ -> noop
- Left e -> do
- let msg = unwords
- [ name
- , "crashed:"
- , show e
- ]
- hPutStrLn stderr msg
- -- TODO click to restart
- void $ addAlert dstatus $
- warningAlert name msg
+runNamedThread (NamedThread name a) = do
+ d <- getAssistant id
+ liftIO . go $ d { threadName = name }
+ where
+ go d = do
+ r <- E.try (runAssistant a d) :: IO (Either E.SomeException ())
+ case r of
+ Right _ -> noop
+ Left e -> do
+ let msg = unwords [name, "crashed:", show e]
+ hPutStrLn stderr msg
+ -- TODO click to restart
+ void $ addAlert (daemonStatusHandle d) $
+ warningAlert name msg
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index 60b560b90..08cdbaf55 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -181,8 +181,8 @@ adjustTransfersSTM dstatus a = do
putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) }
{- Alters a transfer's info, if the transfer is in the map. -}
-alterTransferInfo :: DaemonStatusHandle -> Transfer -> (TransferInfo -> TransferInfo) -> IO ()
-alterTransferInfo dstatus t a = updateTransferInfo' dstatus $ M.adjust a t
+alterTransferInfo :: Transfer -> (TransferInfo -> TransferInfo) -> DaemonStatusHandle -> IO ()
+alterTransferInfo t a dstatus = updateTransferInfo' dstatus $ M.adjust a t
{- Updates a transfer's info. Adds the transfer to the map if necessary,
- or if already present, updates it while preserving the old transferTid,
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs
index fa982b45e..ef9e7a4cb 100644
--- a/Assistant/Monad.hs
+++ b/Assistant/Monad.hs
@@ -13,7 +13,12 @@ module Assistant.Monad (
newAssistantData,
runAssistant,
getAssistant,
- liftAnnex
+ liftAnnex,
+ (<~>),
+ (<<~),
+ daemonStatus,
+ asIO,
+ asIO2,
) where
import "mtl" Control.Monad.Reader
@@ -43,8 +48,9 @@ instance MonadBase IO Assistant where
liftBase = Assistant . liftBase
data AssistantData = AssistantData
- { threadState :: ThreadState
- , daemonStatus :: DaemonStatusHandle
+ { threadName :: String
+ , threadState :: ThreadState
+ , daemonStatusHandle :: DaemonStatusHandle
, scanRemoteMap :: ScanRemoteMap
, transferQueue :: TransferQueue
, transferSlots :: TransferSlots
@@ -57,7 +63,8 @@ data AssistantData = AssistantData
newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData
newAssistantData st dstatus = AssistantData
- <$> pure st
+ <$> pure "main"
+ <*> pure st
<*> pure dstatus
<*> newScanRemoteMap
<*> newTransferQueue
@@ -81,3 +88,28 @@ liftAnnex :: Annex a -> Assistant a
liftAnnex a = do
st <- reader threadState
liftIO $ runThreadState st a
+
+{- Runs an IO action, passing it an IO action that runs an Assistant action. -}
+(<~>) :: (IO a -> IO b) -> Assistant a -> Assistant b
+io <~> a = do
+ d <- reader id
+ liftIO $ io $ runAssistant a d
+
+{- Creates an IO action that will run an Assistant action when run. -}
+asIO :: (a -> Assistant b) -> Assistant (a -> IO b)
+asIO a = do
+ d <- reader id
+ return $ \v -> runAssistant (a v) d
+
+{- Creates an IO action that will run an Assistant action when run. -}
+asIO2 :: (a -> b -> Assistant c) -> Assistant (a -> b -> IO c)
+asIO2 a = do
+ d <- reader id
+ return $ \v1 v2 -> runAssistant (a v1 v2) d
+
+{- Runs an IO action on a selected field of the AssistantData. -}
+(<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b
+io <<~ v = reader v >>= liftIO . io
+
+daemonStatus :: Assistant DaemonStatus
+daemonStatus = getDaemonStatus <<~ daemonStatusHandle
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index 8a9cf8985..bd23c7bb4 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -93,7 +93,7 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
where
go _ Nothing _ _ _ = return True -- no branch, so nothing to do
go shouldretry (Just branch) g u rs = do
- debug threadname
+ brokendebug threadname
[ "pushing to"
, show rs
]
@@ -117,12 +117,12 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
makemap l = M.fromList $ zip l (repeat now)
retry branch g u rs = do
- debug threadname [ "trying manual pull to resolve failed pushes" ]
+ brokendebug threadname [ "trying manual pull to resolve failed pushes" ]
void $ manualPull st (Just branch) rs
go False (Just branch) g u rs
fallback branch g u rs = do
- debug threadname
+ brokendebug threadname
[ "fallback pushing to"
, show rs
]
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index ceb885100..11c6dc584 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -42,7 +42,7 @@ thisThread = "Committer"
{- This thread makes git commits at appropriate times. -}
commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread
-commitThread st changechan commitchan transferqueue dstatus = thread $ do
+commitThread st changechan commitchan transferqueue dstatus = thread $ liftIO $ do
delayadd <- runThreadState st $
maybe delayaddDefault (Just . Seconds) . readish
<$> getConfig (annexConfig "delayadd") ""
@@ -58,7 +58,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do
readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
- debug thisThread
+ brokendebug thisThread
[ "committing"
, show (length readychanges)
, "changes"
@@ -72,7 +72,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do
thread = NamedThread thisThread
refill [] = noop
refill cs = do
- debug thisThread
+ brokendebug thisThread
[ "delaying commit of"
, show (length cs)
, "changes"
diff --git a/Assistant/Threads/ConfigMonitor.hs b/Assistant/Threads/ConfigMonitor.hs
index 7450d5c6b..2d5df48dd 100644
--- a/Assistant/Threads/ConfigMonitor.hs
+++ b/Assistant/Threads/ConfigMonitor.hs
@@ -38,7 +38,7 @@ thisThread = "ConfigMonitor"
- be detected immediately.
-}
configMonitorThread :: ThreadState -> DaemonStatusHandle -> BranchChangeHandle -> CommitChan -> NamedThread
-configMonitorThread st dstatus branchhandle commitchan = thread $ do
+configMonitorThread st dstatus branchhandle commitchan = thread $ liftIO $ do
r <- runThreadState st Annex.gitRepo
go r =<< getConfigs r
where
@@ -50,7 +50,7 @@ configMonitorThread st dstatus branchhandle commitchan = thread $ do
new <- getConfigs r
when (old /= new) $ do
let changedconfigs = new `S.difference` old
- debug thisThread $ "reloading config" :
+ brokendebug thisThread $ "reloading config" :
map fst (S.toList changedconfigs)
reloadConfigs st dstatus changedconfigs
{- Record a commit to get this config
diff --git a/Assistant/Threads/DaemonStatus.hs b/Assistant/Threads/DaemonStatus.hs
index f3174c86f..946bf1b05 100644
--- a/Assistant/Threads/DaemonStatus.hs
+++ b/Assistant/Threads/DaemonStatus.hs
@@ -9,28 +9,21 @@ module Assistant.Threads.DaemonStatus where
import Assistant.Common
import Assistant.DaemonStatus
-import Assistant.ThreadedMonad
import Utility.ThreadScheduler
import Utility.NotificationBroadcaster
-thisThread :: ThreadName
-thisThread = "DaemonStatus"
-
{- This writes the daemon status to disk, when it changes, but no more
- frequently than once every ten minutes.
-}
-daemonStatusThread :: ThreadState -> DaemonStatusHandle -> NamedThread
-daemonStatusThread st dstatus = thread $ do
- notifier <- newNotificationHandle
- =<< changeNotifier <$> getDaemonStatus dstatus
+daemonStatusThread :: NamedThread
+daemonStatusThread = NamedThread "DaemonStatus" $ do
+ notifier <- liftIO . newNotificationHandle
+ =<< changeNotifier <$> daemonStatus
checkpoint
- runEvery (Seconds tenMinutes) $ do
- waitNotification notifier
+ runEvery (Seconds tenMinutes) <~> do
+ liftIO $ waitNotification notifier
checkpoint
- where
- thread = NamedThread thisThread
- checkpoint = do
- status <- getDaemonStatus dstatus
- file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile
- writeDaemonStatusFile file status
-
+ where
+ checkpoint = do
+ file <- liftAnnex $ fromRepo gitAnnexDaemonStatusFile
+ liftIO . writeDaemonStatusFile file =<< daemonStatus
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
index e415a7562..152b40361 100644
--- a/Assistant/Threads/Merger.hs
+++ b/Assistant/Threads/Merger.hs
@@ -25,7 +25,7 @@ thisThread = "Merger"
{- This thread watches for changes to .git/refs/, and handles incoming
- pushes. -}
mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> NamedThread
-mergeThread st dstatus transferqueue branchchange = thread $ do
+mergeThread st dstatus transferqueue branchchange = thread $ liftIO $ do
g <- runThreadState st gitRepo
let dir = Git.localGitDir g </> "refs"
createDirectoryIfMissing True dir
@@ -35,7 +35,7 @@ mergeThread st dstatus transferqueue branchchange = thread $ do
, errHook = hook onErr
}
void $ watchDir dir (const False) hooks id
- debug thisThread ["watching", dir]
+ brokendebug thisThread ["watching", dir]
where
thread = NamedThread thisThread
@@ -81,7 +81,7 @@ onAdd st dstatus transferqueue branchchange file _
changedbranch = fileToBranch file
mergecurrent (Just current)
| equivBranches changedbranch current = do
- liftIO $ debug thisThread
+ liftIO $ brokendebug thisThread
[ "merging"
, show changedbranch
, "into"
diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs
index afd1c223c..f36bb8874 100644
--- a/Assistant/Threads/MountWatcher.hs
+++ b/Assistant/Threads/MountWatcher.hs
@@ -40,7 +40,7 @@ thisThread :: ThreadName
thisThread = "MountWatcher"
mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
-mountWatcherThread st handle scanremotes pushnotifier = thread $
+mountWatcherThread st handle scanremotes pushnotifier = thread $ liftIO $
#if WITH_DBUS
dbusThread st handle scanremotes pushnotifier
#else
@@ -93,7 +93,7 @@ checkMountMonitor client = do
case running of
[] -> startOneService client startableservices
(service:_) -> do
- debug thisThread [ "Using running DBUS service"
+ brokendebug thisThread [ "Using running DBUS service"
, service
, "to monitor mount events."
]
@@ -111,7 +111,7 @@ startOneService client (x:xs) = do
[toVariant x, toVariant (0 :: Word32)]
ifM (elem x <$> listServiceNames client)
( do
- debug thisThread [ "Started DBUS service"
+ brokendebug thisThread [ "Started DBUS service"
, x
, "to monitor mount events."
]
@@ -160,7 +160,7 @@ handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted =
handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO ()
handleMount st dstatus scanremotes pushnotifier dir = do
- debug thisThread ["detected mount of", dir]
+ brokendebug thisThread ["detected mount of", dir]
reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< filter (Git.repoIsLocal . Remote.repo)
<$> remotesUnder st dstatus dir
diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs
index ed64541c3..2af880e02 100644
--- a/Assistant/Threads/NetWatcher.hs
+++ b/Assistant/Threads/NetWatcher.hs
@@ -11,9 +11,6 @@
module Assistant.Threads.NetWatcher where
import Assistant.Common
-import Assistant.ThreadedMonad
-import Assistant.DaemonStatus
-import Assistant.ScanRemotes
import Assistant.Sync
import Assistant.Pushes
import Utility.ThreadScheduler
@@ -29,72 +26,67 @@ import Data.Word (Word32)
#warning Building without dbus support; will poll for network connection changes
#endif
-thisThread :: ThreadName
-thisThread = "NetWatcher"
-
-netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
+netWatcherThread :: NamedThread
#if WITH_DBUS
-netWatcherThread st dstatus scanremotes pushnotifier = thread $
- dbusThread st dstatus scanremotes pushnotifier
+netWatcherThread = thread dbusThread
#else
-netWatcherThread _ _ _ _ = thread noop
+netWatcherThread = thread noop
#endif
- where
- thread = NamedThread thisThread
+ where
+ thread = NamedThread "NetWatcher"
{- This is a fallback for when dbus cannot be used to detect
- network connection changes, but it also ensures that
- any networked remotes that may have not been routable for a
- while (despite the local network staying up), are synced with
- periodically. -}
-netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
-netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $
- runEvery (Seconds 3600) $
- handleConnection st dstatus scanremotes pushnotifier
- where
- thread = NamedThread thisThread
+netWatcherFallbackThread :: NamedThread
+netWatcherFallbackThread = NamedThread "NetWatcherFallback" $
+ runEvery (Seconds 3600) <~> handleConnection
#if WITH_DBUS
-dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
-dbusThread st dstatus scanremotes pushnotifier =
- persistentClient getSystemAddress () onerr go
- where
- go client = ifM (checkNetMonitor client)
- ( do
- listenNMConnections client handleconn
- listenWicdConnections client handleconn
- , do
- runThreadState st $
- warning "No known network monitor available through dbus; falling back to polling"
- )
- handleconn = do
- debug thisThread ["detected network connection"]
- notifyRestart pushnotifier
- handleConnection st dstatus scanremotes pushnotifier
- onerr e _ = do
- runThreadState st $
- warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")"
- {- Wait, in hope that dbus will come back -}
- threadDelaySeconds (Seconds 60)
+dbusThread :: Assistant ()
+dbusThread = do
+ handleerr <- asIO2 onerr
+ runclient <- asIO go
+ liftIO $ persistentClient getSystemAddress () handleerr runclient
+ where
+ go client = ifM (checkNetMonitor client)
+ ( do
+ listenNMConnections client <~> handleconn
+ listenWicdConnections client <~> handleconn
+ , do
+ liftAnnex $
+ warning "No known network monitor available through dbus; falling back to polling"
+ )
+ handleconn = do
+ debug ["detected network connection"]
+ notifyRestart <<~ pushNotifier
+ handleConnection
+ onerr e _ = do
+ liftAnnex $
+ warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")"
+ {- Wait, in hope that dbus will come back -}
+ liftIO $ threadDelaySeconds (Seconds 60)
{- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor network connections. -}
-checkNetMonitor :: Client -> IO Bool
+checkNetMonitor :: Client -> Assistant Bool
checkNetMonitor client = do
- running <- filter (`elem` [networkmanager, wicd])
+ running <- liftIO $ filter (`elem` [networkmanager, wicd])
<$> listServiceNames client
case running of
[] -> return False
(service:_) -> do
- debug thisThread [ "Using running DBUS service"
+ debug [ "Using running DBUS service"
, service
, "to monitor network connection events."
]
return True
- where
- networkmanager = "org.freedesktop.NetworkManager"
- wicd = "org.wicd.daemon"
+ where
+ networkmanager = "org.freedesktop.NetworkManager"
+ wicd = "org.wicd.daemon"
{- Listens for new NetworkManager connections. -}
listenNMConnections :: Client -> IO () -> IO ()
@@ -102,18 +94,18 @@ listenNMConnections client callback =
listen client matcher $ \event ->
when (Just True == anyM activeconnection (signalBody event)) $
callback
- where
- matcher = matchAny
- { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active"
- , matchMember = Just "PropertiesChanged"
- }
- nm_connection_activated = toVariant (2 :: Word32)
- nm_state_key = toVariant ("State" :: String)
- activeconnection v = do
- m <- fromVariant v
- vstate <- lookup nm_state_key $ dictionaryItems m
- state <- fromVariant vstate
- return $ state == nm_connection_activated
+ where
+ matcher = matchAny
+ { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active"
+ , matchMember = Just "PropertiesChanged"
+ }
+ nm_connection_activated = toVariant (2 :: Word32)
+ nm_state_key = toVariant ("State" :: String)
+ activeconnection v = do
+ m <- fromVariant v
+ vstate <- lookup nm_state_key $ dictionaryItems m
+ state <- fromVariant vstate
+ return $ state == nm_connection_activated
{- Listens for new Wicd connections. -}
listenWicdConnections :: Client -> IO () -> IO ()
@@ -121,21 +113,23 @@ listenWicdConnections client callback =
listen client matcher $ \event ->
when (any (== wicd_success) (signalBody event)) $
callback
- where
- matcher = matchAny
- { matchInterface = Just "org.wicd.daemon"
- , matchMember = Just "ConnectResultsSent"
- }
- wicd_success = toVariant ("success" :: String)
+ where
+ matcher = matchAny
+ { matchInterface = Just "org.wicd.daemon"
+ , matchMember = Just "ConnectResultsSent"
+ }
+ wicd_success = toVariant ("success" :: String)
#endif
-handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
-handleConnection st dstatus scanremotes pushnotifier =
- reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
- =<< networkRemotes st
+handleConnection :: Assistant ()
+handleConnection = do
+ d <- getAssistant id
+ liftIO . reconnectRemotes (threadName d) (threadState d)
+ (daemonStatusHandle d) (scanRemoteMap d) (Just $ pushNotifier d)
+ =<< networkRemotes
{- Finds network remotes. -}
-networkRemotes :: ThreadState -> IO [Remote]
-networkRemotes st = runThreadState st $
+networkRemotes :: Assistant [Remote]
+networkRemotes = liftAnnex $
filter (isNothing . Remote.localpath) <$> remoteList
diff --git a/Assistant/Threads/PairListener.hs b/Assistant/Threads/PairListener.hs
index 9875dcb8a..116cc0fa1 100644
--- a/Assistant/Threads/PairListener.hs
+++ b/Assistant/Threads/PairListener.hs
@@ -28,7 +28,7 @@ thisThread :: ThreadName
thisThread = "PairListener"
pairListenerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> UrlRenderer -> NamedThread
-pairListenerThread st dstatus scanremotes urlrenderer = thread $ withSocketsDo $
+pairListenerThread st dstatus scanremotes urlrenderer = thread $ liftIO $ withSocketsDo $
runEvery (Seconds 1) $ void $ tryIO $ do
sock <- getsock
go sock [] []
diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs
index dc7099e3d..591c8b18b 100644
--- a/Assistant/Threads/PushNotifier.hs
+++ b/Assistant/Threads/PushNotifier.hs
@@ -35,7 +35,7 @@ controllerThread pushnotifier a = forever $ do
killThread tid
pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
-pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
+pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $
controllerThread pushnotifier $ do
v <- runThreadState st $ getXMPPCreds
case v of
@@ -45,7 +45,7 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
loop c starttime = do
void $ connectXMPP c $ \jid -> do
fulljid <- bindJID jid
- liftIO $ debug thisThread ["XMPP connected", show fulljid]
+ liftIO $ brokendebug thisThread ["XMPP connected", show fulljid]
putStanza $ gitAnnexPresence gitAnnexSignature
s <- getSession
_ <- liftIO $ forkIO $ void $ runXMPP s $
@@ -54,10 +54,10 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
now <- getCurrentTime
if diffUTCTime now starttime > 300
then do
- debug thisThread ["XMPP connection lost; reconnecting"]
+ brokendebug thisThread ["XMPP connection lost; reconnecting"]
loop c now
else do
- debug thisThread ["XMPP connection failed; will retry"]
+ brokendebug thisThread ["XMPP connection failed; will retry"]
threadDelaySeconds (Seconds 300)
loop c =<< getCurrentTime
@@ -67,7 +67,7 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
receivenotifications = forever $ do
s <- getStanza
- liftIO $ debug thisThread ["received XMPP:", show s]
+ liftIO $ brokendebug thisThread ["received XMPP:", show s]
case s of
ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) ->
liftIO $ pull st dstatus $
@@ -93,7 +93,7 @@ pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO ()
pull _ _ [] = noop
pull st dstatus us = do
rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus
- debug thisThread $ "push notification for" :
+ brokendebug thisThread $ "push notification for" :
map (fromUUID . Remote.uuid ) rs
pullone rs =<< runThreadState st (inRepo Git.Branch.current)
where
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index 671a620b4..a15c0e152 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -25,12 +25,12 @@ thisThread = "Pusher"
{- This thread retries pushes that failed before. -}
pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread
-pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do
+pushRetryThread st dstatus pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds halfhour) $ do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
unless (null topush) $ do
- debug thisThread
+ brokendebug thisThread
[ "retrying"
, show (length topush)
, "failed pushes"
@@ -44,7 +44,7 @@ pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds hal
{- This thread pushes git commits out to remotes soon after they are made. -}
pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread
-pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do
+pushThread st dstatus commitchan pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
@@ -58,7 +58,7 @@ pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Secon
void $ alertWhile dstatus (pushAlert remotes) $
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
else do
- debug thisThread
+ brokendebug thisThread
[ "delaying push of"
, show (length commits)
, "commits"
diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
index 912270090..6379eee46 100644
--- a/Assistant/Threads/SanityChecker.hs
+++ b/Assistant/Threads/SanityChecker.hs
@@ -11,60 +11,56 @@ module Assistant.Threads.SanityChecker (
import Assistant.Common
import Assistant.DaemonStatus
-import Assistant.ThreadedMonad
-import Assistant.Changes
import Assistant.Alert
-import Assistant.TransferQueue
import qualified Git.LsFiles
import Utility.ThreadScheduler
import qualified Assistant.Threads.Watcher as Watcher
import Data.Time.Clock.POSIX
-thisThread :: ThreadName
-thisThread = "SanityChecker"
-
{- This thread wakes up occasionally to make sure the tree is in good shape. -}
-sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread
-sanityCheckerThread st dstatus transferqueue changechan = thread $ forever $ do
- waitForNextCheck dstatus
+sanityCheckerThread :: NamedThread
+sanityCheckerThread = NamedThread "SanityChecker" $ forever $ do
+ waitForNextCheck
- debug thisThread ["starting sanity check"]
+ debug ["starting sanity check"]
- void $ alertWhile dstatus sanityCheckAlert go
+ dstatus <- getAssistant daemonStatusHandle
+ void $ alertWhile dstatus sanityCheckAlert <~> go
- debug thisThread ["sanity check complete"]
- where
- thread = NamedThread thisThread
- go = do
- modifyDaemonStatus_ dstatus $ \s -> s
- { sanityCheckRunning = True }
+ debug ["sanity check complete"]
+ where
+ go = do
+ dstatus <- getAssistant daemonStatusHandle
+ liftIO $ modifyDaemonStatus_ dstatus $ \s -> s
+ { sanityCheckRunning = True }
+
+ now <- liftIO $ getPOSIXTime -- before check started
+ r <- either showerr return =<< tryIO <~> check
- now <- getPOSIXTime -- before check started
- r <- catchIO (check st dstatus transferqueue changechan)
- $ \e -> do
- runThreadState st $ warning $ show e
- return False
+ liftIO $ modifyDaemonStatus_ dstatus $ \s -> s
+ { sanityCheckRunning = False
+ , lastSanityCheck = Just now
+ }
- modifyDaemonStatus_ dstatus $ \s -> s
- { sanityCheckRunning = False
- , lastSanityCheck = Just now
- }
+ return r
- return r
+ showerr e = do
+ liftAnnex $ warning $ show e
+ return False
{- Only run one check per day, from the time of the last check. -}
-waitForNextCheck :: DaemonStatusHandle -> IO ()
-waitForNextCheck dstatus = do
- v <- lastSanityCheck <$> getDaemonStatus dstatus
- now <- getPOSIXTime
- threadDelaySeconds $ Seconds $ calcdelay now v
- where
- calcdelay _ Nothing = oneDay
- calcdelay now (Just lastcheck)
- | lastcheck < now = max oneDay $
- oneDay - truncate (now - lastcheck)
- | otherwise = oneDay
+waitForNextCheck :: Assistant ()
+waitForNextCheck = do
+ v <- lastSanityCheck <$> daemonStatus
+ now <- liftIO getPOSIXTime
+ liftIO $ threadDelaySeconds $ Seconds $ calcdelay now v
+ where
+ calcdelay _ Nothing = oneDay
+ calcdelay now (Just lastcheck)
+ | lastcheck < now = max oneDay $
+ oneDay - truncate (now - lastcheck)
+ | otherwise = oneDay
oneDay :: Int
oneDay = 24 * 60 * 60
@@ -72,29 +68,31 @@ oneDay = 24 * 60 * 60
{- It's important to stay out of the Annex monad as much as possible while
- running potentially expensive parts of this check, since remaining in it
- will block the watcher. -}
-check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO Bool
-check st dstatus transferqueue changechan = do
- g <- runThreadState st gitRepo
+check :: Assistant Bool
+check = do
+ g <- liftAnnex gitRepo
-- Find old unstaged symlinks, and add them to git.
- (unstaged, cleanup) <- Git.LsFiles.notInRepo False ["."] g
- now <- getPOSIXTime
+ (unstaged, cleanup) <- liftIO $ Git.LsFiles.notInRepo False ["."] g
+ now <- liftIO $ getPOSIXTime
forM_ unstaged $ \file -> do
- ms <- catchMaybeIO $ getSymbolicLinkStatus file
+ ms <- liftIO $ catchMaybeIO $ getSymbolicLinkStatus file
case ms of
Just s | toonew (statusChangeTime s) now -> noop
- | isSymbolicLink s ->
- addsymlink file ms
+ | isSymbolicLink s -> addsymlink file ms
_ -> noop
- void cleanup
+ liftIO $ void cleanup
return True
- where
- toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime)
- slop = fromIntegral tenMinutes
- insanity msg = do
- runThreadState st $ warning msg
- void $ addAlert dstatus $ sanityCheckFixAlert msg
- addsymlink file s = do
- Watcher.runHandler thisThread st dstatus
- transferqueue changechan
- Watcher.onAddSymlink file s
- insanity $ "found unstaged symlink: " ++ file
+ where
+ toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime)
+ slop = fromIntegral tenMinutes
+ insanity msg = do
+ liftAnnex $ warning msg
+ dstatus <- getAssistant daemonStatusHandle
+ liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg
+ addsymlink file s = do
+ d <- getAssistant id
+ liftIO $ Watcher.runHandler (threadName d)
+ (threadState d) (daemonStatusHandle d)
+ (transferQueue d) (changeChan d)
+ Watcher.onAddSymlink file s
+ insanity $ "found unstaged symlink: " ++ file
diff --git a/Assistant/Threads/TransferPoller.hs b/Assistant/Threads/TransferPoller.hs
index afead63ec..6f54336bb 100644
--- a/Assistant/Threads/TransferPoller.hs
+++ b/Assistant/Threads/TransferPoller.hs
@@ -8,7 +8,6 @@
module Assistant.Threads.TransferPoller where
import Assistant.Common
-import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Logs.Transfer
import Utility.NotificationBroadcaster
@@ -17,46 +16,42 @@ import qualified Assistant.Threads.TransferWatcher as TransferWatcher
import Control.Concurrent
import qualified Data.Map as M
-thisThread :: ThreadName
-thisThread = "TransferPoller"
-
{- This thread polls the status of ongoing transfers, determining how much
- of each transfer is complete. -}
-transferPollerThread :: ThreadState -> DaemonStatusHandle -> NamedThread
-transferPollerThread st dstatus = thread $ do
- g <- runThreadState st gitRepo
- tn <- newNotificationHandle =<<
- transferNotifier <$> getDaemonStatus dstatus
+transferPollerThread :: NamedThread
+transferPollerThread = NamedThread "TransferPoller" $ do
+ g <- liftAnnex gitRepo
+ tn <- liftIO . newNotificationHandle =<<
+ transferNotifier <$> daemonStatus
forever $ do
- threadDelay 500000 -- 0.5 seconds
- ts <- currentTransfers <$> getDaemonStatus dstatus
+ liftIO $ threadDelay 500000 -- 0.5 seconds
+ ts <- currentTransfers <$> daemonStatus
if M.null ts
- then waitNotification tn -- block until transfers running
+ -- block until transfers running
+ then liftIO $ waitNotification tn
else mapM_ (poll g) $ M.toList ts
- where
- thread = NamedThread thisThread
- poll g (t, info)
- {- Downloads are polled by checking the size of the
- - temp file being used for the transfer. -}
- | transferDirection t == Download = do
- let f = gitAnnexTmpLocation (transferKey t) g
- sz <- catchMaybeIO $
- fromIntegral . fileSize
- <$> getFileStatus f
- newsize t info sz
- {- Uploads don't need to be polled for when the
- - TransferWatcher thread can track file
- - modifications. -}
- | TransferWatcher.watchesTransferSize = noop
- {- Otherwise, this code polls the upload progress
- - by reading the transfer info file. -}
- | otherwise = do
- let f = transferFile t g
- mi <- catchDefaultIO Nothing $
- readTransferInfoFile Nothing f
- maybe noop (newsize t info . bytesComplete) mi
- newsize t info sz
- | bytesComplete info /= sz && isJust sz =
- alterTransferInfo dstatus t $
- \i -> i { bytesComplete = sz }
- | otherwise = noop
+ where
+ poll g (t, info)
+ {- Downloads are polled by checking the size of the
+ - temp file being used for the transfer. -}
+ | transferDirection t == Download = do
+ let f = gitAnnexTmpLocation (transferKey t) g
+ sz <- liftIO $ catchMaybeIO $
+ fromIntegral . fileSize <$> getFileStatus f
+ newsize t info sz
+ {- Uploads don't need to be polled for when the TransferWatcher
+ - thread can track file modifications. -}
+ | TransferWatcher.watchesTransferSize = noop
+ {- Otherwise, this code polls the upload progress
+ - by reading the transfer info file. -}
+ | otherwise = do
+ let f = transferFile t g
+ mi <- liftIO $ catchDefaultIO Nothing $
+ readTransferInfoFile Nothing f
+ maybe noop (newsize t info . bytesComplete) mi
+
+ newsize t info sz
+ | bytesComplete info /= sz && isJust sz =
+ alterTransferInfo t (\i -> i { bytesComplete = sz })
+ <<~ daemonStatusHandle
+ | otherwise = noop
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index 631c36b02..28df518aa 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -34,7 +34,7 @@ thisThread = "TransferScanner"
- that need to be made, to keep data in sync.
-}
transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread
-transferScannerThread st dstatus scanremotes transferqueue = thread $ do
+transferScannerThread st dstatus scanremotes transferqueue = thread $ liftIO $ do
startupScan
go S.empty
where
@@ -100,7 +100,7 @@ failedTransferScan st dstatus transferqueue r = do
-}
expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO ()
expensiveScan st dstatus transferqueue rs = unless onlyweb $ do
- liftIO $ debug thisThread ["starting scan of", show visiblers]
+ brokendebug thisThread ["starting scan of", show visiblers]
void $ alertWhile dstatus (scanAlert visiblers) $ do
g <- runThreadState st gitRepo
(files, cleanup) <- LsFiles.inRepo [] g
@@ -110,13 +110,13 @@ expensiveScan st dstatus transferqueue rs = unless onlyweb $ do
mapM_ (enqueue f) ts
void cleanup
return True
- liftIO $ debug thisThread ["finished scan of", show visiblers]
+ brokendebug thisThread ["finished scan of", show visiblers]
where
onlyweb = all (== webUUID) $ map Remote.uuid rs
visiblers = let rs' = filter (not . Remote.readonly) rs
in if null rs' then rs else rs'
enqueue f (r, t) = do
- debug thisThread ["queuing", show t]
+ brokendebug thisThread ["queuing", show t]
queueTransferWhenSmall transferqueue dstatus (Just f) t r
findtransfers f (key, _) = do
locs <- loggedLocations key
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
index 168ff2688..bb195e519 100644
--- a/Assistant/Threads/TransferWatcher.hs
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -26,7 +26,7 @@ thisThread = "TransferWatcher"
{- This thread watches for changes to the gitAnnexTransferDir,
- and updates the DaemonStatus's map of ongoing transfers. -}
transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread
-transferWatcherThread st dstatus transferqueue = thread $ do
+transferWatcherThread st dstatus transferqueue = thread $ liftIO $ do
g <- runThreadState st gitRepo
let dir = gitAnnexTransferDir g
createDirectoryIfMissing True dir
@@ -38,7 +38,7 @@ transferWatcherThread st dstatus transferqueue = thread $ do
, errHook = hook onErr
}
void $ watchDir dir (const False) hooks id
- debug thisThread ["watching for transfers"]
+ brokendebug thisThread ["watching for transfers"]
where
thread = NamedThread thisThread
@@ -66,7 +66,7 @@ onAdd st dstatus _ file _ = case parseTransferFile file of
where
go _ Nothing = noop -- transfer already finished
go t (Just info) = do
- debug thisThread
+ brokendebug thisThread
[ "transfer starting:"
, show t
]
@@ -87,8 +87,9 @@ onModify _ dstatus _ file _ = do
Just t -> go t =<< readTransferInfoFile Nothing file
where
go _ Nothing = noop
- go t (Just newinfo) = alterTransferInfo dstatus t $ \info ->
- info { bytesComplete = bytesComplete newinfo }
+ go t (Just newinfo) = alterTransferInfo t
+ (\i -> i { bytesComplete = bytesComplete newinfo })
+ dstatus
{- This thread can only watch transfer sizes when the DirWatcher supports
- tracking modificatons to files. -}
@@ -100,7 +101,7 @@ onDel :: Handler
onDel st dstatus transferqueue file _ = case parseTransferFile file of
Nothing -> noop
Just t -> do
- debug thisThread
+ brokendebug thisThread
[ "transfer finishing:"
, show t
]
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index 30d736073..2e880bef9 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -32,7 +32,7 @@ maxTransfers = 1
{- Dispatches transfers from the queue. -}
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> CommitChan -> NamedThread
-transfererThread st dstatus transferqueue slots commitchan = thread $ go =<< readProgramFile
+transfererThread st dstatus transferqueue slots commitchan = thread $ liftIO $ go =<< readProgramFile
where
thread = NamedThread thisThread
go program = forever $ inTransferSlot dstatus slots $
@@ -47,11 +47,11 @@ startTransfer :: ThreadState -> DaemonStatusHandle -> CommitChan -> FilePath ->
startTransfer st dstatus commitchan program t info = case (transferRemote info, associatedFile info) of
(Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info)
( do
- debug thisThread [ "Transferring:" , show t ]
+ brokendebug thisThread [ "Transferring:" , show t ]
notifyTransfer dstatus
return $ Just (t, info, transferprocess remote file)
, do
- debug thisThread [ "Skipping unnecessary transfer:" , show t ]
+ brokendebug thisThread [ "Skipping unnecessary transfer:" , show t ]
void $ removeTransfer dstatus t
return Nothing
)
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index 5d24fe23f..7ab124b14 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -56,9 +56,9 @@ needLsof = error $ unlines
]
watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread
-watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do
+watchThread st dstatus transferqueue changechan = NamedThread thisThread $ liftIO $ do
void $ watchDir "." ignored hooks startup
- debug thisThread [ "watching", "."]
+ brokendebug thisThread [ "watching", "."]
where
startup = startupScan st dstatus
hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a
@@ -132,7 +132,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
checkcontent key s
ensurestaged link s
, do
- liftIO $ debug threadname ["fix symlink", file]
+ liftIO $ brokendebug threadname ["fix symlink", file]
liftIO $ removeFile file
liftIO $ createSymbolicLink link file
checkcontent key =<< liftIO (getDaemonStatus dstatus)
@@ -162,7 +162,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
{- For speed, tries to reuse the existing blob for symlink target. -}
addlink link = do
- liftIO $ debug threadname ["add symlink", file]
+ liftIO $ brokendebug threadname ["add symlink", file]
v <- catObjectDetails $ Ref $ ':':file
case v of
Just (currlink, sha)
@@ -187,7 +187,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
onDel :: Handler
onDel threadname file _ _dstatus _ = do
- liftIO $ debug threadname ["file deleted", file]
+ liftIO $ brokendebug threadname ["file deleted", file]
Annex.Queue.addUpdateIndex =<<
inRepo (Git.UpdateIndex.unstageFile file)
madeChange file RmChange
@@ -201,7 +201,7 @@ onDel threadname file _ _dstatus _ = do
- just as good. -}
onDelDir :: Handler
onDelDir threadname dir _ _dstatus _ = do
- liftIO $ debug threadname ["directory deleted", dir]
+ liftIO $ brokendebug threadname ["directory deleted", dir]
Annex.Queue.addCommand "rm"
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
madeChange dir RmDirChange
diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs
index bb8fcd186..02911bab9 100644
--- a/Assistant/Threads/WebApp.hs
+++ b/Assistant/Threads/WebApp.hs
@@ -52,7 +52,7 @@ webAppThread
-> Maybe (IO String)
-> Maybe (Url -> FilePath -> IO ())
-> NamedThread
-webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ do
+webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ liftIO $ do
webapp <- WebApp
<$> pure assistantdata
<*> (pack <$> genRandomToken)
@@ -83,7 +83,7 @@ webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $
(relHome =<< absPath
=<< runThreadState (threadState assistantdata) (fromRepo repoPath))
go port webapp htmlshim urlfile = do
- debug thisThread ["running on port", show port]
+ brokendebug thisThread ["running on port", show port]
let url = myUrl webapp port
maybe noop (`writeFile` url) urlfile
writeHtmlShim url htmlshim