summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-10-30 17:14:26 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-10-30 17:14:51 -0400
commit4318f594d544320825093de8661ed1b40e4774d5 (patch)
tree709dcd2fe739c503651bc7bd5e1df35a52a27977 /Assistant
parent07cd1b2b40735d460c8225762fcf3992b9886c60 (diff)
finished pushing Assistant monad into all relevant files
All temporary and old functions are removed.
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/DaemonStatus.hs23
-rw-r--r--Assistant/Drop.hs13
-rw-r--r--Assistant/Monad.hs9
-rw-r--r--Assistant/Pairing/Network.hs85
-rw-r--r--Assistant/Threads/Committer.hs4
-rw-r--r--Assistant/Threads/Merger.hs7
-rw-r--r--Assistant/Threads/MountWatcher.hs4
-rw-r--r--Assistant/Threads/NetWatcher.hs2
-rw-r--r--Assistant/Threads/PairListener.hs18
-rw-r--r--Assistant/Threads/PushNotifier.hs8
-rw-r--r--Assistant/Threads/TransferScanner.hs42
-rw-r--r--Assistant/Threads/TransferWatcher.hs12
-rw-r--r--Assistant/Threads/Transferrer.hs14
-rw-r--r--Assistant/Threads/Watcher.hs11
-rw-r--r--Assistant/TransferQueue.hs209
-rw-r--r--Assistant/TransferSlots.hs26
16 files changed, 230 insertions, 257 deletions
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index 4223b6ce9..4744c86ba 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -24,30 +24,12 @@ import Data.Time
import System.Locale
import qualified Data.Map as M
--- TODO remove this
-getDaemonStatusOld :: DaemonStatusHandle -> IO DaemonStatus
-getDaemonStatusOld = atomically . readTMVar
-
getDaemonStatus :: Assistant DaemonStatus
getDaemonStatus = (atomically . readTMVar) <<~ daemonStatusHandle
--- TODO remove this
-modifyDaemonStatusOld_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> IO ()
-modifyDaemonStatusOld_ dstatus a = modifyDaemonStatusOld dstatus $ \s -> (a s, ())
-
modifyDaemonStatus_ :: (DaemonStatus -> DaemonStatus) -> Assistant ()
modifyDaemonStatus_ a = modifyDaemonStatus $ \s -> (a s, ())
--- TODO remove this
-modifyDaemonStatusOld :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> IO b
-modifyDaemonStatusOld dstatus a = do
- (s, b) <- atomically $ do
- r@(s, _) <- a <$> takeTMVar dstatus
- putTMVar dstatus s
- return r
- sendNotification $ changeNotifier s
- return b
-
modifyDaemonStatus :: (DaemonStatus -> (DaemonStatus, b)) -> Assistant b
modifyDaemonStatus a = do
dstatus <- getAssistant daemonStatusHandle
@@ -188,11 +170,6 @@ notifyTransfer = do
liftIO $ sendNotification
=<< transferNotifier <$> atomically (readTMVar dstatus)
--- TODO remove
-notifyTransferOld :: DaemonStatusHandle -> IO ()
-notifyTransferOld dstatus = sendNotification
- =<< transferNotifier <$> atomically (readTMVar dstatus)
-
{- Send a notification when alerts are changed. -}
notifyAlert :: Assistant ()
notifyAlert = do
diff --git a/Assistant/Drop.hs b/Assistant/Drop.hs
index 021e40a87..ed9ba577e 100644
--- a/Assistant/Drop.hs
+++ b/Assistant/Drop.hs
@@ -20,12 +20,13 @@ import Config
{- Drop from local and/or remote when allowed by the preferred content and
- numcopies settings. -}
-handleDrops :: DaemonStatusHandle -> Bool -> Key -> AssociatedFile -> Annex ()
-handleDrops _ _ _ Nothing = noop
-handleDrops dstatus fromhere key f = do
- syncrs <- liftIO $ syncRemotes <$> getDaemonStatusOld dstatus
- locs <- loggedLocations key
- handleDrops' locs syncrs fromhere key f
+handleDrops :: Bool -> Key -> AssociatedFile -> Assistant ()
+handleDrops _ _ Nothing = noop
+handleDrops fromhere key f = do
+ syncrs <- syncRemotes <$> getDaemonStatus
+ liftAnnex $ do
+ locs <- loggedLocations key
+ handleDrops' locs syncrs fromhere key f
handleDrops' :: [UUID] -> [Remote] -> Bool -> Key -> AssociatedFile -> Annex ()
handleDrops' _ _ _ _ Nothing = noop
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs
index 0dfd4e34d..fb4cb3340 100644
--- a/Assistant/Monad.hs
+++ b/Assistant/Monad.hs
@@ -17,6 +17,7 @@ module Assistant.Monad (
(<~>),
(<<~),
asIO,
+ asIO1,
asIO2,
) where
@@ -95,12 +96,16 @@ io <~> a = do
liftIO $ io $ runAssistant a d
{- Creates an IO action that will run an Assistant action when run. -}
-asIO :: (a -> Assistant b) -> Assistant (a -> IO b)
+asIO :: Assistant a -> Assistant (IO a)
asIO a = do
d <- reader id
+ return $ runAssistant a d
+
+asIO1 :: (a -> Assistant b) -> Assistant (a -> IO b)
+asIO1 a = do
+ d <- reader id
return $ \v -> runAssistant (a v) d
-{- Creates an IO action that will run an Assistant action when run. -}
asIO2 :: (a -> b -> Assistant c) -> Assistant (a -> b -> IO c)
asIO2 a = do
d <- reader id
diff --git a/Assistant/Pairing/Network.hs b/Assistant/Pairing/Network.hs
index 9b030617e..9ee1db3c6 100644
--- a/Assistant/Pairing/Network.hs
+++ b/Assistant/Pairing/Network.hs
@@ -50,47 +50,50 @@ multicastAddress (IPv6Addr _) = "ff02::fb"
-}
multicastPairMsg :: Maybe Int -> Secret -> PairData -> PairStage -> IO ()
multicastPairMsg repeats secret pairdata stage = go M.empty repeats
- where
- go _ (Just 0) = noop
- go cache n = do
- addrs <- activeNetworkAddresses
- let cache' = updatecache cache addrs
- mapM_ (sendinterface cache') addrs
- threadDelaySeconds (Seconds 2)
- go cache' $ pred <$> n
- {- The multicast library currently chokes on ipv6 addresses. -}
- sendinterface _ (IPv6Addr _) = noop
- sendinterface cache i = void $ catchMaybeIO $
- withSocketsDo $ bracket setup cleanup use
- where
- setup = multicastSender (multicastAddress i) pairingPort
- cleanup (sock, _) = sClose sock -- FIXME does not work
- use (sock, addr) = do
- setInterface sock (showAddr i)
- maybe noop (\s -> void $ sendTo sock s addr)
- (M.lookup i cache)
- updatecache cache [] = cache
- updatecache cache (i:is)
- | M.member i cache = updatecache cache is
- | otherwise = updatecache (M.insert i (show $ mkmsg i) cache) is
- mkmsg addr = PairMsg $
- mkVerifiable (stage, pairdata, addr) secret
+ where
+ go _ (Just 0) = noop
+ go cache n = do
+ addrs <- activeNetworkAddresses
+ let cache' = updatecache cache addrs
+ mapM_ (sendinterface cache') addrs
+ threadDelaySeconds (Seconds 2)
+ go cache' $ pred <$> n
+ {- The multicast library currently chokes on ipv6 addresses. -}
+ sendinterface _ (IPv6Addr _) = noop
+ sendinterface cache i = void $ catchMaybeIO $
+ withSocketsDo $ bracket setup cleanup use
+ where
+ setup = multicastSender (multicastAddress i) pairingPort
+ cleanup (sock, _) = sClose sock -- FIXME does not work
+ use (sock, addr) = do
+ setInterface sock (showAddr i)
+ maybe noop (\s -> void $ sendTo sock s addr)
+ (M.lookup i cache)
+ updatecache cache [] = cache
+ updatecache cache (i:is)
+ | M.member i cache = updatecache cache is
+ | otherwise = updatecache (M.insert i (show $ mkmsg i) cache) is
+ mkmsg addr = PairMsg $
+ mkVerifiable (stage, pairdata, addr) secret
-startSending :: DaemonStatusHandle -> PairingInProgress -> PairStage -> (PairStage -> IO ()) -> IO ()
-startSending dstatus pip stage sender = void $ forkIO $ do
- tid <- myThreadId
- let pip' = pip { inProgressPairStage = stage, inProgressThreadId = Just tid }
- oldpip <- modifyDaemonStatusOld dstatus $
- \s -> (s { pairingInProgress = Just pip' }, pairingInProgress s)
- maybe noop stopold oldpip
- sender stage
- where
- stopold = maybe noop killThread . inProgressThreadId
+startSending :: PairingInProgress -> PairStage -> (PairStage -> IO ()) -> Assistant ()
+startSending pip stage sender = do
+ a <- asIO start
+ void $ liftIO $ forkIO a
+ where
+ start = do
+ tid <- liftIO myThreadId
+ let pip' = pip { inProgressPairStage = stage, inProgressThreadId = Just tid }
+ oldpip <- modifyDaemonStatus $
+ \s -> (s { pairingInProgress = Just pip' }, pairingInProgress s)
+ maybe noop stopold oldpip
+ liftIO $ sender stage
+ stopold = maybe noop (liftIO . killThread) . inProgressThreadId
-stopSending :: PairingInProgress -> DaemonStatusHandle -> IO ()
-stopSending pip dstatus = do
- maybe noop killThread $ inProgressThreadId pip
- modifyDaemonStatusOld_ dstatus $ \s -> s { pairingInProgress = Nothing }
+stopSending :: PairingInProgress -> Assistant ()
+stopSending pip = do
+ maybe noop (liftIO . killThread) $ inProgressThreadId pip
+ modifyDaemonStatus_ $ \s -> s { pairingInProgress = Nothing }
class ToSomeAddr a where
toSomeAddr :: a -> SomeAddr
@@ -123,5 +126,5 @@ pairRepo msg = concat
, ":"
, remoteDirectory d
]
- where
- d = pairMsgData msg
+ where
+ d = pairMsgData msg
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index d73dc1eb0..445e44dea 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -202,9 +202,7 @@ handleAdds delayadd cs = returnWhen (null incomplete) $ do
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
showEndOk
- transferqueue <- getAssistant transferQueue
- dstatus <- getAssistant daemonStatusHandle
- liftAnnex $ queueTransfers Next transferqueue dstatus key (Just file) Upload
+ queueTransfers Next key (Just file) Upload
return $ Just change
{- Check that the keysource's keyFilename still exists,
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
index 46511701c..44056dc35 100644
--- a/Assistant/Threads/Merger.hs
+++ b/Assistant/Threads/Merger.hs
@@ -67,11 +67,8 @@ onAdd file
| ".lock" `isSuffixOf` file = noop
| isAnnexBranch file = do
branchChanged
- transferqueue <- getAssistant transferQueue
- dstatus <- getAssistant daemonStatusHandle
- liftAnnex $
- whenM Annex.Branch.forceUpdate $
- queueDeferredDownloads Later transferqueue dstatus
+ whenM (liftAnnex Annex.Branch.forceUpdate) $
+ queueDeferredDownloads Later
| "/synced/" `isInfixOf` file = do
mergecurrent =<< liftAnnex (inRepo Git.Branch.current)
| otherwise = noop
diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs
index bb63e840f..d3da50dd4 100644
--- a/Assistant/Threads/MountWatcher.hs
+++ b/Assistant/Threads/MountWatcher.hs
@@ -48,7 +48,7 @@ mountWatcherThread = NamedThread "MountWatcher" $
dbusThread :: Assistant ()
dbusThread = do
- runclient <- asIO go
+ runclient <- asIO1 go
r <- liftIO $ E.try $ runClient getSessionAddress runclient
either onerr (const noop) r
where
@@ -59,7 +59,7 @@ dbusThread = do
- mount point from the dbus message, but this is
- easier. -}
mvar <- liftIO $ newMVar =<< currentMountPoints
- handleevent <- asIO $ \_event -> do
+ handleevent <- asIO1 $ \_event -> do
nowmounted <- liftIO $ currentMountPoints
wasmounted <- liftIO $ swapMVar mvar nowmounted
handleMounts wasmounted nowmounted
diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs
index 9df4f3a4d..4396b2632 100644
--- a/Assistant/Threads/NetWatcher.hs
+++ b/Assistant/Threads/NetWatcher.hs
@@ -49,7 +49,7 @@ netWatcherFallbackThread = NamedThread "NetWatcherFallback" $
dbusThread :: Assistant ()
dbusThread = do
handleerr <- asIO2 onerr
- runclient <- asIO go
+ runclient <- asIO1 go
liftIO $ persistentClient getSystemAddress () handleerr runclient
where
go client = ifM (checkNetMonitor client)
diff --git a/Assistant/Threads/PairListener.hs b/Assistant/Threads/PairListener.hs
index f682dd6da..f29bec4b4 100644
--- a/Assistant/Threads/PairListener.hs
+++ b/Assistant/Threads/PairListener.hs
@@ -27,7 +27,7 @@ thisThread = "PairListener"
pairListenerThread :: UrlRenderer -> NamedThread
pairListenerThread urlrenderer = NamedThread "PairListener" $ do
- listener <- asIO $ go [] []
+ listener <- asIO1 $ go [] []
liftIO $ withSocketsDo $
runEvery (Seconds 1) $ void $ tryIO $
listener =<< getsock
@@ -69,7 +69,7 @@ pairListenerThread urlrenderer = NamedThread "PairListener" $ do
| not verified && sameuuid = do
liftAnnex $ warning
"detected possible pairing brute force attempt; disabled pairing"
- stopSending pip <<~ daemonStatusHandle
+ stopSending pip
return (Nothing, False)
|otherwise = return (Just pip, verified && sameuuid)
where
@@ -104,7 +104,7 @@ pairReqReceived :: Bool -> UrlRenderer -> PairMsg -> Assistant ()
pairReqReceived True _ _ = noop -- ignore our own PairReq
pairReqReceived False urlrenderer msg = do
url <- liftIO $ renderUrl urlrenderer (FinishPairR msg) []
- close <- asIO removeAlert
+ close <- asIO1 removeAlert
void $ addAlert $ pairRequestReceivedAlert repo
AlertButton
{ buttonUrl = url
@@ -119,11 +119,10 @@ pairReqReceived False urlrenderer msg = do
- and send a single PairDone. -}
pairAckReceived :: Bool -> Maybe PairingInProgress -> PairMsg -> [PairingInProgress] -> Assistant [PairingInProgress]
pairAckReceived True (Just pip) msg cache = do
- stopSending pip <<~ daemonStatusHandle
+ stopSending pip
liftIO $ setupAuthorizedKeys msg
finishedPairing msg (inProgressSshKeyPair pip)
- dstatus <- getAssistant daemonStatusHandle
- liftIO $ startSending dstatus pip PairDone $ multicastPairMsg
+ startSending pip PairDone $ multicastPairMsg
(Just 1) (inProgressSecret pip) (inProgressPairData pip)
return $ pip : take 10 cache
{- A stale PairAck might also be seen, after we've finished pairing.
@@ -132,10 +131,9 @@ pairAckReceived True (Just pip) msg cache = do
- response to stale PairAcks for them. -}
pairAckReceived _ _ msg cache = do
let pips = filter (verifiedPairMsg msg) cache
- dstatus <- getAssistant daemonStatusHandle
unless (null pips) $
- liftIO $ forM_ pips $ \pip ->
- startSending dstatus pip PairDone $ multicastPairMsg
+ forM_ pips $ \pip ->
+ startSending pip PairDone $ multicastPairMsg
(Just 1) (inProgressSecret pip) (inProgressPairData pip)
return cache
@@ -152,5 +150,5 @@ pairDoneReceived :: Bool -> Maybe PairingInProgress -> PairMsg -> Assistant ()
pairDoneReceived False _ _ = noop -- not verified
pairDoneReceived True Nothing _ = noop -- not in progress
pairDoneReceived True (Just pip) msg = do
- stopSending pip <<~ daemonStatusHandle
+ stopSending pip
finishedPairing msg (inProgressSshKeyPair pip)
diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs
index b50a2e4b9..d2d5e08bf 100644
--- a/Assistant/Threads/PushNotifier.hs
+++ b/Assistant/Threads/PushNotifier.hs
@@ -26,10 +26,10 @@ import Data.Time.Clock
pushNotifierThread :: NamedThread
pushNotifierThread = NamedThread "PushNotifier" $ do
- iodebug <- asIO debug
- iopull <- asIO pull
- iowaitpush <- asIO $ const waitPush
- ioclient <- asIO2 $ xmppClient $ iowaitpush ()
+ iodebug <- asIO1 debug
+ iopull <- asIO1 pull
+ iowaitpush <- asIO $ waitPush
+ ioclient <- asIO2 $ xmppClient $ iowaitpush
forever $ do
tid <- liftIO $ forkIO $ ioclient iodebug iopull
waitRestart
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index c37b1e3b9..3b3c3f304 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -21,7 +21,7 @@ import qualified Remote
import qualified Types.Remote as Remote
import Utility.ThreadScheduler
import qualified Git.LsFiles as LsFiles
-import Command
+import qualified Backend
import Annex.Content
import Annex.Wanted
@@ -78,11 +78,7 @@ failedTransferScan r = do
- that the remote doesn't already have the
- key, so it's not redundantly checked here. -}
requeue t info
- requeue t info = do
- transferqueue <- getAssistant transferQueue
- dstatus <- getAssistant daemonStatusHandle
- liftIO $ queueTransferWhenSmall
- transferqueue dstatus (associatedFile info) t r
+ requeue t info = queueTransferWhenSmall (associatedFile info) t r
{- This is a expensive scan through the full git work tree, finding
- files to transfer. The scan is blocked when the transfer queue gets
@@ -101,10 +97,9 @@ expensiveScan rs = unless onlyweb $ do
void $ alertWhile (scanAlert visiblers) $ do
g <- liftAnnex gitRepo
(files, cleanup) <- liftIO $ LsFiles.inRepo [] g
- dstatus <- getAssistant daemonStatusHandle
forM_ files $ \f -> do
- ts <- liftAnnex $
- ifAnnexed f (findtransfers dstatus f) (return [])
+ ts <- maybe (return []) (findtransfers f)
+ =<< liftAnnex (Backend.lookupFile f)
mapM_ (enqueue f) ts
void $ liftIO cleanup
return True
@@ -115,25 +110,24 @@ expensiveScan rs = unless onlyweb $ do
in if null rs' then rs else rs'
enqueue f (r, t) = do
debug ["queuing", show t]
- transferqueue <- getAssistant transferQueue
- dstatus <- getAssistant daemonStatusHandle
- liftIO $ queueTransferWhenSmall transferqueue dstatus (Just f) t r
- findtransfers dstatus f (key, _) = do
- locs <- loggedLocations key
+ queueTransferWhenSmall (Just f) t r
+ findtransfers f (key, _) = do
{- The syncable remotes may have changed since this
- scan began. -}
- syncrs <- liftIO $ syncRemotes <$> getDaemonStatusOld dstatus
- present <- inAnnex key
+ syncrs <- syncRemotes <$> getDaemonStatus
+ liftAnnex $ do
+ locs <- loggedLocations key
+ present <- inAnnex key
- handleDrops' locs syncrs present key (Just f)
+ handleDrops' locs syncrs present key (Just f)
- let slocs = S.fromList locs
- let use a = return $ catMaybes $ map (a key slocs) syncrs
- if present
- then filterM (wantSend (Just f) . Remote.uuid . fst)
- =<< use (genTransfer Upload False)
- else ifM (wantGet $ Just f)
- ( use (genTransfer Download True) , return [] )
+ let slocs = S.fromList locs
+ let use a = return $ catMaybes $ map (a key slocs) syncrs
+ if present
+ then filterM (wantSend (Just f) . Remote.uuid . fst)
+ =<< use (genTransfer Upload False)
+ else ifM (wantGet $ Just f)
+ ( use (genTransfer Download True) , return [] )
genTransfer :: Direction -> Bool -> Key -> S.Set UUID -> Remote -> Maybe (Remote, Transfer)
genTransfer direction want key slocs r
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
index f18a2acd8..7b789b8b6 100644
--- a/Assistant/Threads/TransferWatcher.hs
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -115,15 +115,9 @@ finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
finishedTransfer t (Just info)
| transferDirection t == Download =
whenM (liftAnnex $ inAnnex $ transferKey t) $ do
- dstatus <- getAssistant daemonStatusHandle
- transferqueue <- getAssistant transferQueue
- liftAnnex $ handleDrops dstatus False
- (transferKey t) (associatedFile info)
- liftAnnex $ queueTransfersMatching (/= transferUUID t)
- Later transferqueue dstatus
+ handleDrops False (transferKey t) (associatedFile info)
+ queueTransfersMatching (/= transferUUID t) Later
(transferKey t) (associatedFile info) Upload
- | otherwise = do
- dstatus <- getAssistant daemonStatusHandle
- liftAnnex $ handleDrops dstatus True (transferKey t) (associatedFile info)
+ | otherwise = handleDrops True (transferKey t) (associatedFile info)
finishedTransfer _ _ = noop
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index c60790f9b..84013eaa7 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -30,26 +30,22 @@ maxTransfers = 1
transfererThread :: NamedThread
transfererThread = NamedThread "Transferr" $ do
program <- liftIO readProgramFile
- transferqueue <- getAssistant transferQueue
- dstatus <- getAssistant daemonStatusHandle
- starter <- asIO2 $ startTransfer program
- forever $ inTransferSlot $ liftIO $
- maybe (return Nothing) (uncurry starter)
- =<< getNextTransfer transferqueue dstatus notrunning
+ forever $ inTransferSlot $
+ maybe (return Nothing) (uncurry $ startTransfer program)
+ =<< getNextTransfer notrunning
where
{- Skip transfers that are already running. -}
notrunning = isNothing . startedTime
{- By the time this is called, the daemonstatus's transfer map should
- already have been updated to include the transfer. -}
-startTransfer :: FilePath -> Transfer -> TransferInfo -> Assistant (Maybe (Transfer, TransferInfo, IO ()))
+startTransfer :: FilePath -> Transfer -> TransferInfo -> Assistant (Maybe (Transfer, TransferInfo, Assistant ()))
startTransfer program t info = case (transferRemote info, associatedFile info) of
(Just remote, Just file) -> ifM (liftAnnex $ shouldTransfer t info)
( do
debug [ "Transferring:" , show t ]
notifyTransfer
- tp <- asIO2 transferprocess
- return $ Just (t, info, tp remote file)
+ return $ Just (t, info, transferprocess remote file)
, do
debug [ "Skipping unnecessary transfer:" , show t ]
void $ removeTransfer t
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index 7dcde1f46..a74976deb 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -54,7 +54,7 @@ needLsof = error $ unlines
watchThread :: NamedThread
watchThread = NamedThread "Watcher" $ do
- startup <- asIO startupScan
+ startup <- asIO1 startupScan
addhook <- hook onAdd
delhook <- hook onDel
addsymlinkhook <- hook onAddSymlink
@@ -182,12 +182,9 @@ onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file)
checkcontent key daemonstatus
| scanComplete daemonstatus = do
present <- liftAnnex $ inAnnex key
- dstatus <- getAssistant daemonStatusHandle
- unless present $ do
- transferqueue <- getAssistant transferQueue
- liftAnnex $ queueTransfers Next transferqueue
- dstatus key (Just file) Download
- liftAnnex $ handleDrops dstatus present key (Just file)
+ unless present $
+ queueTransfers Next key (Just file) Download
+ handleDrops present key (Just file)
| otherwise = noop
onDel :: Handler
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index 11c2d58d8..8e403cc43 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -21,9 +21,8 @@ module Assistant.TransferQueue (
dequeueTransfers,
) where
-import Common.Annex
+import Assistant.Common
import Assistant.DaemonStatus
-import Assistant.Types.DaemonStatus
import Assistant.Types.TransferQueue
import Logs.Transfer
import Types.Remote
@@ -35,8 +34,8 @@ import Control.Concurrent.STM
import qualified Data.Map as M
{- Reads the queue's content without blocking or changing it. -}
-getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)]
-getTransferQueue q = atomically $ readTVar $ queuelist q
+getTransferQueue :: Assistant [(Transfer, TransferInfo)]
+getTransferQueue = (atomically . readTVar . queuelist) <<~ transferQueue
stubInfo :: AssociatedFile -> Remote -> TransferInfo
stubInfo f r = stubTransferInfo
@@ -46,101 +45,104 @@ stubInfo f r = stubTransferInfo
{- Adds transfers to queue for some of the known remotes.
- Honors preferred content settings, only transferring wanted files. -}
-queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
+queueTransfers :: Schedule -> Key -> AssociatedFile -> Direction -> Assistant ()
queueTransfers = queueTransfersMatching (const True)
{- Adds transfers to queue for some of the known remotes, that match a
- condition. Honors preferred content settings. -}
-queueTransfersMatching :: (UUID -> Bool) -> Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
-queueTransfersMatching matching schedule q dstatus k f direction
- | direction == Download = whenM (wantGet f) go
+queueTransfersMatching :: (UUID -> Bool) -> Schedule -> Key -> AssociatedFile -> Direction -> Assistant ()
+queueTransfersMatching matching schedule k f direction
+ | direction == Download = whenM (liftAnnex $ wantGet f) go
| otherwise = go
- where
- go = do
- rs <- sufficientremotes
- =<< syncRemotes <$> liftIO (getDaemonStatusOld dstatus)
- let matchingrs = filter (matching . Remote.uuid) rs
- if null matchingrs
- then defer
- else forM_ matchingrs $ \r -> liftIO $
- enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
- sufficientremotes rs
- {- Queue downloads from all remotes that
- - have the key, with the cheapest ones first.
- - More expensive ones will only be tried if
- - downloading from a cheap one fails. -}
- | direction == Download = do
- uuids <- Remote.keyLocations k
- return $ filter (\r -> uuid r `elem` uuids) rs
- {- Upload to all remotes that want the content. -}
- | otherwise = filterM (wantSend f . Remote.uuid) $
- filter (not . Remote.readonly) rs
- gentransfer r = Transfer
- { transferDirection = direction
- , transferKey = k
- , transferUUID = Remote.uuid r
- }
- defer
- {- Defer this download, as no known remote has the key. -}
- | direction == Download = void $ liftIO $ atomically $
- modifyTVar' (deferreddownloads q) $
- \l -> (k, f):l
- | otherwise = noop
+ where
+ go = do
+ rs <- liftAnnex . sufficientremotes
+ =<< syncRemotes <$> getDaemonStatus
+ let matchingrs = filter (matching . Remote.uuid) rs
+ if null matchingrs
+ then defer
+ else forM_ matchingrs $ \r ->
+ enqueue schedule (gentransfer r) (stubInfo f r)
+ sufficientremotes rs
+ {- Queue downloads from all remotes that
+ - have the key, with the cheapest ones first.
+ - More expensive ones will only be tried if
+ - downloading from a cheap one fails. -}
+ | direction == Download = do
+ uuids <- Remote.keyLocations k
+ return $ filter (\r -> uuid r `elem` uuids) rs
+ {- Upload to all remotes that want the content. -}
+ | otherwise = filterM (wantSend f . Remote.uuid) $
+ filter (not . Remote.readonly) rs
+ gentransfer r = Transfer
+ { transferDirection = direction
+ , transferKey = k
+ , transferUUID = Remote.uuid r
+ }
+ defer
+ {- Defer this download, as no known remote has the key. -}
+ | direction == Download = do
+ q <- getAssistant transferQueue
+ void $ liftIO $ atomically $
+ modifyTVar' (deferreddownloads q) $
+ \l -> (k, f):l
+ | otherwise = noop
{- Queues any deferred downloads that can now be accomplished, leaving
- any others in the list to try again later. -}
-queueDeferredDownloads :: Schedule -> TransferQueue -> DaemonStatusHandle -> Annex ()
-queueDeferredDownloads schedule q dstatus = do
+queueDeferredDownloads :: Schedule -> Assistant ()
+queueDeferredDownloads schedule = do
+ q <- getAssistant transferQueue
l <- liftIO $ atomically $ swapTVar (deferreddownloads q) []
- rs <- syncRemotes <$> liftIO (getDaemonStatusOld dstatus)
+ rs <- syncRemotes <$> getDaemonStatus
left <- filterM (queue rs) l
unless (null left) $
liftIO $ atomically $ modifyTVar' (deferreddownloads q) $
\new -> new ++ left
- where
- queue rs (k, f) = do
- uuids <- Remote.keyLocations k
- let sources = filter (\r -> uuid r `elem` uuids) rs
- unless (null sources) $
- forM_ sources $ \r -> liftIO $
- enqueue schedule q dstatus
- (gentransfer r) (stubInfo f r)
- return $ null sources
- where
- gentransfer r = Transfer
- { transferDirection = Download
- , transferKey = k
- , transferUUID = Remote.uuid r
- }
-
-enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
-enqueue schedule q dstatus t info
+ where
+ queue rs (k, f) = do
+ uuids <- liftAnnex $ Remote.keyLocations k
+ let sources = filter (\r -> uuid r `elem` uuids) rs
+ unless (null sources) $
+ forM_ sources $ \r ->
+ enqueue schedule (gentransfer r) (stubInfo f r)
+ return $ null sources
+ where
+ gentransfer r = Transfer
+ { transferDirection = Download
+ , transferKey = k
+ , transferUUID = Remote.uuid r
+ }
+
+enqueue :: Schedule -> Transfer -> TransferInfo -> Assistant ()
+enqueue schedule t info
| schedule == Next = go (new:)
| otherwise = go (\l -> l++[new])
- where
- new = (t, info)
- go modlist = do
- atomically $ do
- void $ modifyTVar' (queuesize q) succ
- void $ modifyTVar' (queuelist q) modlist
- void $ notifyTransferOld dstatus
+ where
+ new = (t, info)
+ go modlist = do
+ q <- getAssistant transferQueue
+ liftIO $ atomically $ do
+ void $ modifyTVar' (queuesize q) succ
+ void $ modifyTVar' (queuelist q) modlist
+ notifyTransfer
{- Adds a transfer to the queue. -}
-queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
-queueTransfer schedule q dstatus f t remote =
- enqueue schedule q dstatus t (stubInfo f remote)
+queueTransfer :: Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant ()
+queueTransfer schedule f t remote = enqueue schedule t (stubInfo f remote)
{- Blocks until the queue is no larger than a given size, and then adds a
- transfer to the queue. -}
-queueTransferAt :: Int -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
-queueTransferAt wantsz schedule q dstatus f t remote = do
- atomically $ do
+queueTransferAt :: Int -> Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant ()
+queueTransferAt wantsz schedule f t remote = do
+ q <- getAssistant transferQueue
+ liftIO $ atomically $ do
sz <- readTVar (queuesize q)
unless (sz <= wantsz) $
retry -- blocks until queuesize changes
- enqueue schedule q dstatus t (stubInfo f remote)
+ enqueue schedule t (stubInfo f remote)
-queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransferWhenSmall :: AssociatedFile -> Transfer -> Remote -> Assistant ()
queueTransferWhenSmall = queueTransferAt 10 Later
{- Blocks until a pending transfer is available in the queue,
@@ -151,38 +153,45 @@ queueTransferWhenSmall = queueTransferAt 10 Later
-
- This is done in a single STM transaction, so there is no window
- where an observer sees an inconsistent status. -}
-getNextTransfer :: TransferQueue -> DaemonStatusHandle -> (TransferInfo -> Bool) -> IO (Maybe (Transfer, TransferInfo))
-getNextTransfer q dstatus acceptable = atomically $ do
- sz <- readTVar (queuesize q)
- if sz < 1
- then retry -- blocks until queuesize changes
- else do
- (r@(t,info):rest) <- readTVar (queuelist q)
- writeTVar (queuelist q) rest
- void $ modifyTVar' (queuesize q) pred
- if acceptable info
- then do
- adjustTransfersSTM dstatus $
- M.insertWith' const t info
- return $ Just r
- else return Nothing
+getNextTransfer :: (TransferInfo -> Bool) -> Assistant (Maybe (Transfer, TransferInfo))
+getNextTransfer acceptable = do
+ q <- getAssistant transferQueue
+ dstatus <- getAssistant daemonStatusHandle
+ liftIO $ atomically $ do
+ sz <- readTVar (queuesize q)
+ if sz < 1
+ then retry -- blocks until queuesize changes
+ else do
+ (r@(t,info):rest) <- readTVar (queuelist q)
+ writeTVar (queuelist q) rest
+ void $ modifyTVar' (queuesize q) pred
+ if acceptable info
+ then do
+ adjustTransfersSTM dstatus $
+ M.insertWith' const t info
+ return $ Just r
+ else return Nothing
{- Moves transfers matching a condition from the queue, to the
- currentTransfers map. -}
-getMatchingTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)]
-getMatchingTransfers q dstatus c = atomically $ do
- ts <- dequeueTransfersSTM q c
- unless (null ts) $
- adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts
- return ts
+getMatchingTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)]
+getMatchingTransfers c = do
+ q <- getAssistant transferQueue
+ dstatus <- getAssistant daemonStatusHandle
+ liftIO $ atomically $ do
+ ts <- dequeueTransfersSTM q c
+ unless (null ts) $
+ adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts
+ return ts
{- Removes transfers matching a condition from the queue, and returns the
- removed transfers. -}
-dequeueTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)]
-dequeueTransfers q dstatus c = do
- removed <- atomically $ dequeueTransfersSTM q c
+dequeueTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)]
+dequeueTransfers c = do
+ q <- getAssistant transferQueue
+ removed <- liftIO $ atomically $ dequeueTransfersSTM q c
unless (null removed) $
- notifyTransferOld dstatus
+ notifyTransfer
return removed
dequeueTransfersSTM :: TransferQueue -> (Transfer -> Bool) -> STM [(Transfer, TransferInfo)]
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
index 8afd23a12..80a062e36 100644
--- a/Assistant/TransferSlots.hs
+++ b/Assistant/TransferSlots.hs
@@ -17,7 +17,7 @@ import qualified Control.Exception as E
import Control.Concurrent
import qualified Control.Concurrent.MSemN as MSemN
-type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, IO ()))
+type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Assistant ()))
{- Waits until a transfer slot becomes available, then runs a
- TransferGenerator, and then runs the transfer action in its own thread.
@@ -44,26 +44,30 @@ inImmediateTransferSlot gen = do
- then pausing the thread until a ResumeTransfer exception is raised,
- then rerunning the action.
-}
-runTransferThread :: Maybe (Transfer, TransferInfo, IO ()) -> Assistant ()
+runTransferThread :: Maybe (Transfer, TransferInfo, Assistant ()) -> Assistant ()
runTransferThread Nothing = flip MSemN.signal 1 <<~ transferSlots
runTransferThread (Just (t, info, a)) = do
d <- getAssistant id
- tid <- liftIO $ forkIO $ go d
+ aio <- asIO a
+ tid <- liftIO $ forkIO $ runTransferThread' d aio
updateTransferInfo t $ info { transferTid = Just tid }
+
+runTransferThread' :: AssistantData -> IO () -> IO ()
+runTransferThread' d a = go
where
- go d = catchPauseResume d a
- pause d = catchPauseResume d $ runEvery (Seconds 86400) noop
+ go = catchPauseResume a
+ pause = catchPauseResume $ runEvery (Seconds 86400) noop
{- Note: This must use E.try, rather than E.catch.
- When E.catch is used, and has called go in its exception
- handler, Control.Concurrent.throwTo will block sometimes
- when signaling. Using E.try avoids the problem. -}
- catchPauseResume d a' = do
+ catchPauseResume a' = do
r <- E.try a' :: IO (Either E.SomeException ())
case r of
Left e -> case E.fromException e of
- Just PauseTransfer -> pause d
- Just ResumeTransfer -> go d
- _ -> done d
- _ -> done d
- done d = flip runAssistant d $
+ Just PauseTransfer -> pause
+ Just ResumeTransfer -> go
+ _ -> done
+ _ -> done
+ done = flip runAssistant d $
flip MSemN.signal 1 <<~ transferSlots