diff options
-rw-r--r-- | Assistant/DaemonStatus.hs | 23 | ||||
-rw-r--r-- | Assistant/Drop.hs | 13 | ||||
-rw-r--r-- | Assistant/Monad.hs | 9 | ||||
-rw-r--r-- | Assistant/Pairing/Network.hs | 85 | ||||
-rw-r--r-- | Assistant/Threads/Committer.hs | 4 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 7 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 4 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 2 | ||||
-rw-r--r-- | Assistant/Threads/PairListener.hs | 18 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 8 | ||||
-rw-r--r-- | Assistant/Threads/TransferScanner.hs | 42 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 12 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 14 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 11 | ||||
-rw-r--r-- | Assistant/TransferQueue.hs | 209 | ||||
-rw-r--r-- | Assistant/TransferSlots.hs | 26 |
16 files changed, 230 insertions, 257 deletions
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index 4223b6ce9..4744c86ba 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -24,30 +24,12 @@ import Data.Time import System.Locale import qualified Data.Map as M --- TODO remove this -getDaemonStatusOld :: DaemonStatusHandle -> IO DaemonStatus -getDaemonStatusOld = atomically . readTMVar - getDaemonStatus :: Assistant DaemonStatus getDaemonStatus = (atomically . readTMVar) <<~ daemonStatusHandle --- TODO remove this -modifyDaemonStatusOld_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> IO () -modifyDaemonStatusOld_ dstatus a = modifyDaemonStatusOld dstatus $ \s -> (a s, ()) - modifyDaemonStatus_ :: (DaemonStatus -> DaemonStatus) -> Assistant () modifyDaemonStatus_ a = modifyDaemonStatus $ \s -> (a s, ()) --- TODO remove this -modifyDaemonStatusOld :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> IO b -modifyDaemonStatusOld dstatus a = do - (s, b) <- atomically $ do - r@(s, _) <- a <$> takeTMVar dstatus - putTMVar dstatus s - return r - sendNotification $ changeNotifier s - return b - modifyDaemonStatus :: (DaemonStatus -> (DaemonStatus, b)) -> Assistant b modifyDaemonStatus a = do dstatus <- getAssistant daemonStatusHandle @@ -188,11 +170,6 @@ notifyTransfer = do liftIO $ sendNotification =<< transferNotifier <$> atomically (readTMVar dstatus) --- TODO remove -notifyTransferOld :: DaemonStatusHandle -> IO () -notifyTransferOld dstatus = sendNotification - =<< transferNotifier <$> atomically (readTMVar dstatus) - {- Send a notification when alerts are changed. -} notifyAlert :: Assistant () notifyAlert = do diff --git a/Assistant/Drop.hs b/Assistant/Drop.hs index 021e40a87..ed9ba577e 100644 --- a/Assistant/Drop.hs +++ b/Assistant/Drop.hs @@ -20,12 +20,13 @@ import Config {- Drop from local and/or remote when allowed by the preferred content and - numcopies settings. -} -handleDrops :: DaemonStatusHandle -> Bool -> Key -> AssociatedFile -> Annex () -handleDrops _ _ _ Nothing = noop -handleDrops dstatus fromhere key f = do - syncrs <- liftIO $ syncRemotes <$> getDaemonStatusOld dstatus - locs <- loggedLocations key - handleDrops' locs syncrs fromhere key f +handleDrops :: Bool -> Key -> AssociatedFile -> Assistant () +handleDrops _ _ Nothing = noop +handleDrops fromhere key f = do + syncrs <- syncRemotes <$> getDaemonStatus + liftAnnex $ do + locs <- loggedLocations key + handleDrops' locs syncrs fromhere key f handleDrops' :: [UUID] -> [Remote] -> Bool -> Key -> AssociatedFile -> Annex () handleDrops' _ _ _ _ Nothing = noop diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index 0dfd4e34d..fb4cb3340 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -17,6 +17,7 @@ module Assistant.Monad ( (<~>), (<<~), asIO, + asIO1, asIO2, ) where @@ -95,12 +96,16 @@ io <~> a = do liftIO $ io $ runAssistant a d {- Creates an IO action that will run an Assistant action when run. -} -asIO :: (a -> Assistant b) -> Assistant (a -> IO b) +asIO :: Assistant a -> Assistant (IO a) asIO a = do d <- reader id + return $ runAssistant a d + +asIO1 :: (a -> Assistant b) -> Assistant (a -> IO b) +asIO1 a = do + d <- reader id return $ \v -> runAssistant (a v) d -{- Creates an IO action that will run an Assistant action when run. -} asIO2 :: (a -> b -> Assistant c) -> Assistant (a -> b -> IO c) asIO2 a = do d <- reader id diff --git a/Assistant/Pairing/Network.hs b/Assistant/Pairing/Network.hs index 9b030617e..9ee1db3c6 100644 --- a/Assistant/Pairing/Network.hs +++ b/Assistant/Pairing/Network.hs @@ -50,47 +50,50 @@ multicastAddress (IPv6Addr _) = "ff02::fb" -} multicastPairMsg :: Maybe Int -> Secret -> PairData -> PairStage -> IO () multicastPairMsg repeats secret pairdata stage = go M.empty repeats - where - go _ (Just 0) = noop - go cache n = do - addrs <- activeNetworkAddresses - let cache' = updatecache cache addrs - mapM_ (sendinterface cache') addrs - threadDelaySeconds (Seconds 2) - go cache' $ pred <$> n - {- The multicast library currently chokes on ipv6 addresses. -} - sendinterface _ (IPv6Addr _) = noop - sendinterface cache i = void $ catchMaybeIO $ - withSocketsDo $ bracket setup cleanup use - where - setup = multicastSender (multicastAddress i) pairingPort - cleanup (sock, _) = sClose sock -- FIXME does not work - use (sock, addr) = do - setInterface sock (showAddr i) - maybe noop (\s -> void $ sendTo sock s addr) - (M.lookup i cache) - updatecache cache [] = cache - updatecache cache (i:is) - | M.member i cache = updatecache cache is - | otherwise = updatecache (M.insert i (show $ mkmsg i) cache) is - mkmsg addr = PairMsg $ - mkVerifiable (stage, pairdata, addr) secret + where + go _ (Just 0) = noop + go cache n = do + addrs <- activeNetworkAddresses + let cache' = updatecache cache addrs + mapM_ (sendinterface cache') addrs + threadDelaySeconds (Seconds 2) + go cache' $ pred <$> n + {- The multicast library currently chokes on ipv6 addresses. -} + sendinterface _ (IPv6Addr _) = noop + sendinterface cache i = void $ catchMaybeIO $ + withSocketsDo $ bracket setup cleanup use + where + setup = multicastSender (multicastAddress i) pairingPort + cleanup (sock, _) = sClose sock -- FIXME does not work + use (sock, addr) = do + setInterface sock (showAddr i) + maybe noop (\s -> void $ sendTo sock s addr) + (M.lookup i cache) + updatecache cache [] = cache + updatecache cache (i:is) + | M.member i cache = updatecache cache is + | otherwise = updatecache (M.insert i (show $ mkmsg i) cache) is + mkmsg addr = PairMsg $ + mkVerifiable (stage, pairdata, addr) secret -startSending :: DaemonStatusHandle -> PairingInProgress -> PairStage -> (PairStage -> IO ()) -> IO () -startSending dstatus pip stage sender = void $ forkIO $ do - tid <- myThreadId - let pip' = pip { inProgressPairStage = stage, inProgressThreadId = Just tid } - oldpip <- modifyDaemonStatusOld dstatus $ - \s -> (s { pairingInProgress = Just pip' }, pairingInProgress s) - maybe noop stopold oldpip - sender stage - where - stopold = maybe noop killThread . inProgressThreadId +startSending :: PairingInProgress -> PairStage -> (PairStage -> IO ()) -> Assistant () +startSending pip stage sender = do + a <- asIO start + void $ liftIO $ forkIO a + where + start = do + tid <- liftIO myThreadId + let pip' = pip { inProgressPairStage = stage, inProgressThreadId = Just tid } + oldpip <- modifyDaemonStatus $ + \s -> (s { pairingInProgress = Just pip' }, pairingInProgress s) + maybe noop stopold oldpip + liftIO $ sender stage + stopold = maybe noop (liftIO . killThread) . inProgressThreadId -stopSending :: PairingInProgress -> DaemonStatusHandle -> IO () -stopSending pip dstatus = do - maybe noop killThread $ inProgressThreadId pip - modifyDaemonStatusOld_ dstatus $ \s -> s { pairingInProgress = Nothing } +stopSending :: PairingInProgress -> Assistant () +stopSending pip = do + maybe noop (liftIO . killThread) $ inProgressThreadId pip + modifyDaemonStatus_ $ \s -> s { pairingInProgress = Nothing } class ToSomeAddr a where toSomeAddr :: a -> SomeAddr @@ -123,5 +126,5 @@ pairRepo msg = concat , ":" , remoteDirectory d ] - where - d = pairMsgData msg + where + d = pairMsgData msg diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index d73dc1eb0..445e44dea 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -202,9 +202,7 @@ handleAdds delayadd cs = returnWhen (null incomplete) $ do Git.HashObject.hashObject BlobObject link stageSymlink file sha showEndOk - transferqueue <- getAssistant transferQueue - dstatus <- getAssistant daemonStatusHandle - liftAnnex $ queueTransfers Next transferqueue dstatus key (Just file) Upload + queueTransfers Next key (Just file) Upload return $ Just change {- Check that the keysource's keyFilename still exists, diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index 46511701c..44056dc35 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -67,11 +67,8 @@ onAdd file | ".lock" `isSuffixOf` file = noop | isAnnexBranch file = do branchChanged - transferqueue <- getAssistant transferQueue - dstatus <- getAssistant daemonStatusHandle - liftAnnex $ - whenM Annex.Branch.forceUpdate $ - queueDeferredDownloads Later transferqueue dstatus + whenM (liftAnnex Annex.Branch.forceUpdate) $ + queueDeferredDownloads Later | "/synced/" `isInfixOf` file = do mergecurrent =<< liftAnnex (inRepo Git.Branch.current) | otherwise = noop diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index bb63e840f..d3da50dd4 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -48,7 +48,7 @@ mountWatcherThread = NamedThread "MountWatcher" $ dbusThread :: Assistant () dbusThread = do - runclient <- asIO go + runclient <- asIO1 go r <- liftIO $ E.try $ runClient getSessionAddress runclient either onerr (const noop) r where @@ -59,7 +59,7 @@ dbusThread = do - mount point from the dbus message, but this is - easier. -} mvar <- liftIO $ newMVar =<< currentMountPoints - handleevent <- asIO $ \_event -> do + handleevent <- asIO1 $ \_event -> do nowmounted <- liftIO $ currentMountPoints wasmounted <- liftIO $ swapMVar mvar nowmounted handleMounts wasmounted nowmounted diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index 9df4f3a4d..4396b2632 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -49,7 +49,7 @@ netWatcherFallbackThread = NamedThread "NetWatcherFallback" $ dbusThread :: Assistant () dbusThread = do handleerr <- asIO2 onerr - runclient <- asIO go + runclient <- asIO1 go liftIO $ persistentClient getSystemAddress () handleerr runclient where go client = ifM (checkNetMonitor client) diff --git a/Assistant/Threads/PairListener.hs b/Assistant/Threads/PairListener.hs index f682dd6da..f29bec4b4 100644 --- a/Assistant/Threads/PairListener.hs +++ b/Assistant/Threads/PairListener.hs @@ -27,7 +27,7 @@ thisThread = "PairListener" pairListenerThread :: UrlRenderer -> NamedThread pairListenerThread urlrenderer = NamedThread "PairListener" $ do - listener <- asIO $ go [] [] + listener <- asIO1 $ go [] [] liftIO $ withSocketsDo $ runEvery (Seconds 1) $ void $ tryIO $ listener =<< getsock @@ -69,7 +69,7 @@ pairListenerThread urlrenderer = NamedThread "PairListener" $ do | not verified && sameuuid = do liftAnnex $ warning "detected possible pairing brute force attempt; disabled pairing" - stopSending pip <<~ daemonStatusHandle + stopSending pip return (Nothing, False) |otherwise = return (Just pip, verified && sameuuid) where @@ -104,7 +104,7 @@ pairReqReceived :: Bool -> UrlRenderer -> PairMsg -> Assistant () pairReqReceived True _ _ = noop -- ignore our own PairReq pairReqReceived False urlrenderer msg = do url <- liftIO $ renderUrl urlrenderer (FinishPairR msg) [] - close <- asIO removeAlert + close <- asIO1 removeAlert void $ addAlert $ pairRequestReceivedAlert repo AlertButton { buttonUrl = url @@ -119,11 +119,10 @@ pairReqReceived False urlrenderer msg = do - and send a single PairDone. -} pairAckReceived :: Bool -> Maybe PairingInProgress -> PairMsg -> [PairingInProgress] -> Assistant [PairingInProgress] pairAckReceived True (Just pip) msg cache = do - stopSending pip <<~ daemonStatusHandle + stopSending pip liftIO $ setupAuthorizedKeys msg finishedPairing msg (inProgressSshKeyPair pip) - dstatus <- getAssistant daemonStatusHandle - liftIO $ startSending dstatus pip PairDone $ multicastPairMsg + startSending pip PairDone $ multicastPairMsg (Just 1) (inProgressSecret pip) (inProgressPairData pip) return $ pip : take 10 cache {- A stale PairAck might also be seen, after we've finished pairing. @@ -132,10 +131,9 @@ pairAckReceived True (Just pip) msg cache = do - response to stale PairAcks for them. -} pairAckReceived _ _ msg cache = do let pips = filter (verifiedPairMsg msg) cache - dstatus <- getAssistant daemonStatusHandle unless (null pips) $ - liftIO $ forM_ pips $ \pip -> - startSending dstatus pip PairDone $ multicastPairMsg + forM_ pips $ \pip -> + startSending pip PairDone $ multicastPairMsg (Just 1) (inProgressSecret pip) (inProgressPairData pip) return cache @@ -152,5 +150,5 @@ pairDoneReceived :: Bool -> Maybe PairingInProgress -> PairMsg -> Assistant () pairDoneReceived False _ _ = noop -- not verified pairDoneReceived True Nothing _ = noop -- not in progress pairDoneReceived True (Just pip) msg = do - stopSending pip <<~ daemonStatusHandle + stopSending pip finishedPairing msg (inProgressSshKeyPair pip) diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs index b50a2e4b9..d2d5e08bf 100644 --- a/Assistant/Threads/PushNotifier.hs +++ b/Assistant/Threads/PushNotifier.hs @@ -26,10 +26,10 @@ import Data.Time.Clock pushNotifierThread :: NamedThread pushNotifierThread = NamedThread "PushNotifier" $ do - iodebug <- asIO debug - iopull <- asIO pull - iowaitpush <- asIO $ const waitPush - ioclient <- asIO2 $ xmppClient $ iowaitpush () + iodebug <- asIO1 debug + iopull <- asIO1 pull + iowaitpush <- asIO $ waitPush + ioclient <- asIO2 $ xmppClient $ iowaitpush forever $ do tid <- liftIO $ forkIO $ ioclient iodebug iopull waitRestart diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index c37b1e3b9..3b3c3f304 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -21,7 +21,7 @@ import qualified Remote import qualified Types.Remote as Remote import Utility.ThreadScheduler import qualified Git.LsFiles as LsFiles -import Command +import qualified Backend import Annex.Content import Annex.Wanted @@ -78,11 +78,7 @@ failedTransferScan r = do - that the remote doesn't already have the - key, so it's not redundantly checked here. -} requeue t info - requeue t info = do - transferqueue <- getAssistant transferQueue - dstatus <- getAssistant daemonStatusHandle - liftIO $ queueTransferWhenSmall - transferqueue dstatus (associatedFile info) t r + requeue t info = queueTransferWhenSmall (associatedFile info) t r {- This is a expensive scan through the full git work tree, finding - files to transfer. The scan is blocked when the transfer queue gets @@ -101,10 +97,9 @@ expensiveScan rs = unless onlyweb $ do void $ alertWhile (scanAlert visiblers) $ do g <- liftAnnex gitRepo (files, cleanup) <- liftIO $ LsFiles.inRepo [] g - dstatus <- getAssistant daemonStatusHandle forM_ files $ \f -> do - ts <- liftAnnex $ - ifAnnexed f (findtransfers dstatus f) (return []) + ts <- maybe (return []) (findtransfers f) + =<< liftAnnex (Backend.lookupFile f) mapM_ (enqueue f) ts void $ liftIO cleanup return True @@ -115,25 +110,24 @@ expensiveScan rs = unless onlyweb $ do in if null rs' then rs else rs' enqueue f (r, t) = do debug ["queuing", show t] - transferqueue <- getAssistant transferQueue - dstatus <- getAssistant daemonStatusHandle - liftIO $ queueTransferWhenSmall transferqueue dstatus (Just f) t r - findtransfers dstatus f (key, _) = do - locs <- loggedLocations key + queueTransferWhenSmall (Just f) t r + findtransfers f (key, _) = do {- The syncable remotes may have changed since this - scan began. -} - syncrs <- liftIO $ syncRemotes <$> getDaemonStatusOld dstatus - present <- inAnnex key + syncrs <- syncRemotes <$> getDaemonStatus + liftAnnex $ do + locs <- loggedLocations key + present <- inAnnex key - handleDrops' locs syncrs present key (Just f) + handleDrops' locs syncrs present key (Just f) - let slocs = S.fromList locs - let use a = return $ catMaybes $ map (a key slocs) syncrs - if present - then filterM (wantSend (Just f) . Remote.uuid . fst) - =<< use (genTransfer Upload False) - else ifM (wantGet $ Just f) - ( use (genTransfer Download True) , return [] ) + let slocs = S.fromList locs + let use a = return $ catMaybes $ map (a key slocs) syncrs + if present + then filterM (wantSend (Just f) . Remote.uuid . fst) + =<< use (genTransfer Upload False) + else ifM (wantGet $ Just f) + ( use (genTransfer Download True) , return [] ) genTransfer :: Direction -> Bool -> Key -> S.Set UUID -> Remote -> Maybe (Remote, Transfer) genTransfer direction want key slocs r diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index f18a2acd8..7b789b8b6 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -115,15 +115,9 @@ finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant () finishedTransfer t (Just info) | transferDirection t == Download = whenM (liftAnnex $ inAnnex $ transferKey t) $ do - dstatus <- getAssistant daemonStatusHandle - transferqueue <- getAssistant transferQueue - liftAnnex $ handleDrops dstatus False - (transferKey t) (associatedFile info) - liftAnnex $ queueTransfersMatching (/= transferUUID t) - Later transferqueue dstatus + handleDrops False (transferKey t) (associatedFile info) + queueTransfersMatching (/= transferUUID t) Later (transferKey t) (associatedFile info) Upload - | otherwise = do - dstatus <- getAssistant daemonStatusHandle - liftAnnex $ handleDrops dstatus True (transferKey t) (associatedFile info) + | otherwise = handleDrops True (transferKey t) (associatedFile info) finishedTransfer _ _ = noop diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index c60790f9b..84013eaa7 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -30,26 +30,22 @@ maxTransfers = 1 transfererThread :: NamedThread transfererThread = NamedThread "Transferr" $ do program <- liftIO readProgramFile - transferqueue <- getAssistant transferQueue - dstatus <- getAssistant daemonStatusHandle - starter <- asIO2 $ startTransfer program - forever $ inTransferSlot $ liftIO $ - maybe (return Nothing) (uncurry starter) - =<< getNextTransfer transferqueue dstatus notrunning + forever $ inTransferSlot $ + maybe (return Nothing) (uncurry $ startTransfer program) + =<< getNextTransfer notrunning where {- Skip transfers that are already running. -} notrunning = isNothing . startedTime {- By the time this is called, the daemonstatus's transfer map should - already have been updated to include the transfer. -} -startTransfer :: FilePath -> Transfer -> TransferInfo -> Assistant (Maybe (Transfer, TransferInfo, IO ())) +startTransfer :: FilePath -> Transfer -> TransferInfo -> Assistant (Maybe (Transfer, TransferInfo, Assistant ())) startTransfer program t info = case (transferRemote info, associatedFile info) of (Just remote, Just file) -> ifM (liftAnnex $ shouldTransfer t info) ( do debug [ "Transferring:" , show t ] notifyTransfer - tp <- asIO2 transferprocess - return $ Just (t, info, tp remote file) + return $ Just (t, info, transferprocess remote file) , do debug [ "Skipping unnecessary transfer:" , show t ] void $ removeTransfer t diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 7dcde1f46..a74976deb 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -54,7 +54,7 @@ needLsof = error $ unlines watchThread :: NamedThread watchThread = NamedThread "Watcher" $ do - startup <- asIO startupScan + startup <- asIO1 startupScan addhook <- hook onAdd delhook <- hook onDel addsymlinkhook <- hook onAddSymlink @@ -182,12 +182,9 @@ onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file) checkcontent key daemonstatus | scanComplete daemonstatus = do present <- liftAnnex $ inAnnex key - dstatus <- getAssistant daemonStatusHandle - unless present $ do - transferqueue <- getAssistant transferQueue - liftAnnex $ queueTransfers Next transferqueue - dstatus key (Just file) Download - liftAnnex $ handleDrops dstatus present key (Just file) + unless present $ + queueTransfers Next key (Just file) Download + handleDrops present key (Just file) | otherwise = noop onDel :: Handler diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index 11c2d58d8..8e403cc43 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -21,9 +21,8 @@ module Assistant.TransferQueue ( dequeueTransfers, ) where -import Common.Annex +import Assistant.Common import Assistant.DaemonStatus -import Assistant.Types.DaemonStatus import Assistant.Types.TransferQueue import Logs.Transfer import Types.Remote @@ -35,8 +34,8 @@ import Control.Concurrent.STM import qualified Data.Map as M {- Reads the queue's content without blocking or changing it. -} -getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)] -getTransferQueue q = atomically $ readTVar $ queuelist q +getTransferQueue :: Assistant [(Transfer, TransferInfo)] +getTransferQueue = (atomically . readTVar . queuelist) <<~ transferQueue stubInfo :: AssociatedFile -> Remote -> TransferInfo stubInfo f r = stubTransferInfo @@ -46,101 +45,104 @@ stubInfo f r = stubTransferInfo {- Adds transfers to queue for some of the known remotes. - Honors preferred content settings, only transferring wanted files. -} -queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () +queueTransfers :: Schedule -> Key -> AssociatedFile -> Direction -> Assistant () queueTransfers = queueTransfersMatching (const True) {- Adds transfers to queue for some of the known remotes, that match a - condition. Honors preferred content settings. -} -queueTransfersMatching :: (UUID -> Bool) -> Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () -queueTransfersMatching matching schedule q dstatus k f direction - | direction == Download = whenM (wantGet f) go +queueTransfersMatching :: (UUID -> Bool) -> Schedule -> Key -> AssociatedFile -> Direction -> Assistant () +queueTransfersMatching matching schedule k f direction + | direction == Download = whenM (liftAnnex $ wantGet f) go | otherwise = go - where - go = do - rs <- sufficientremotes - =<< syncRemotes <$> liftIO (getDaemonStatusOld dstatus) - let matchingrs = filter (matching . Remote.uuid) rs - if null matchingrs - then defer - else forM_ matchingrs $ \r -> liftIO $ - enqueue schedule q dstatus (gentransfer r) (stubInfo f r) - sufficientremotes rs - {- Queue downloads from all remotes that - - have the key, with the cheapest ones first. - - More expensive ones will only be tried if - - downloading from a cheap one fails. -} - | direction == Download = do - uuids <- Remote.keyLocations k - return $ filter (\r -> uuid r `elem` uuids) rs - {- Upload to all remotes that want the content. -} - | otherwise = filterM (wantSend f . Remote.uuid) $ - filter (not . Remote.readonly) rs - gentransfer r = Transfer - { transferDirection = direction - , transferKey = k - , transferUUID = Remote.uuid r - } - defer - {- Defer this download, as no known remote has the key. -} - | direction == Download = void $ liftIO $ atomically $ - modifyTVar' (deferreddownloads q) $ - \l -> (k, f):l - | otherwise = noop + where + go = do + rs <- liftAnnex . sufficientremotes + =<< syncRemotes <$> getDaemonStatus + let matchingrs = filter (matching . Remote.uuid) rs + if null matchingrs + then defer + else forM_ matchingrs $ \r -> + enqueue schedule (gentransfer r) (stubInfo f r) + sufficientremotes rs + {- Queue downloads from all remotes that + - have the key, with the cheapest ones first. + - More expensive ones will only be tried if + - downloading from a cheap one fails. -} + | direction == Download = do + uuids <- Remote.keyLocations k + return $ filter (\r -> uuid r `elem` uuids) rs + {- Upload to all remotes that want the content. -} + | otherwise = filterM (wantSend f . Remote.uuid) $ + filter (not . Remote.readonly) rs + gentransfer r = Transfer + { transferDirection = direction + , transferKey = k + , transferUUID = Remote.uuid r + } + defer + {- Defer this download, as no known remote has the key. -} + | direction == Download = do + q <- getAssistant transferQueue + void $ liftIO $ atomically $ + modifyTVar' (deferreddownloads q) $ + \l -> (k, f):l + | otherwise = noop {- Queues any deferred downloads that can now be accomplished, leaving - any others in the list to try again later. -} -queueDeferredDownloads :: Schedule -> TransferQueue -> DaemonStatusHandle -> Annex () -queueDeferredDownloads schedule q dstatus = do +queueDeferredDownloads :: Schedule -> Assistant () +queueDeferredDownloads schedule = do + q <- getAssistant transferQueue l <- liftIO $ atomically $ swapTVar (deferreddownloads q) [] - rs <- syncRemotes <$> liftIO (getDaemonStatusOld dstatus) + rs <- syncRemotes <$> getDaemonStatus left <- filterM (queue rs) l unless (null left) $ liftIO $ atomically $ modifyTVar' (deferreddownloads q) $ \new -> new ++ left - where - queue rs (k, f) = do - uuids <- Remote.keyLocations k - let sources = filter (\r -> uuid r `elem` uuids) rs - unless (null sources) $ - forM_ sources $ \r -> liftIO $ - enqueue schedule q dstatus - (gentransfer r) (stubInfo f r) - return $ null sources - where - gentransfer r = Transfer - { transferDirection = Download - , transferKey = k - , transferUUID = Remote.uuid r - } - -enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO () -enqueue schedule q dstatus t info + where + queue rs (k, f) = do + uuids <- liftAnnex $ Remote.keyLocations k + let sources = filter (\r -> uuid r `elem` uuids) rs + unless (null sources) $ + forM_ sources $ \r -> + enqueue schedule (gentransfer r) (stubInfo f r) + return $ null sources + where + gentransfer r = Transfer + { transferDirection = Download + , transferKey = k + , transferUUID = Remote.uuid r + } + +enqueue :: Schedule -> Transfer -> TransferInfo -> Assistant () +enqueue schedule t info | schedule == Next = go (new:) | otherwise = go (\l -> l++[new]) - where - new = (t, info) - go modlist = do - atomically $ do - void $ modifyTVar' (queuesize q) succ - void $ modifyTVar' (queuelist q) modlist - void $ notifyTransferOld dstatus + where + new = (t, info) + go modlist = do + q <- getAssistant transferQueue + liftIO $ atomically $ do + void $ modifyTVar' (queuesize q) succ + void $ modifyTVar' (queuelist q) modlist + notifyTransfer {- Adds a transfer to the queue. -} -queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () -queueTransfer schedule q dstatus f t remote = - enqueue schedule q dstatus t (stubInfo f remote) +queueTransfer :: Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant () +queueTransfer schedule f t remote = enqueue schedule t (stubInfo f remote) {- Blocks until the queue is no larger than a given size, and then adds a - transfer to the queue. -} -queueTransferAt :: Int -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () -queueTransferAt wantsz schedule q dstatus f t remote = do - atomically $ do +queueTransferAt :: Int -> Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant () +queueTransferAt wantsz schedule f t remote = do + q <- getAssistant transferQueue + liftIO $ atomically $ do sz <- readTVar (queuesize q) unless (sz <= wantsz) $ retry -- blocks until queuesize changes - enqueue schedule q dstatus t (stubInfo f remote) + enqueue schedule t (stubInfo f remote) -queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () +queueTransferWhenSmall :: AssociatedFile -> Transfer -> Remote -> Assistant () queueTransferWhenSmall = queueTransferAt 10 Later {- Blocks until a pending transfer is available in the queue, @@ -151,38 +153,45 @@ queueTransferWhenSmall = queueTransferAt 10 Later - - This is done in a single STM transaction, so there is no window - where an observer sees an inconsistent status. -} -getNextTransfer :: TransferQueue -> DaemonStatusHandle -> (TransferInfo -> Bool) -> IO (Maybe (Transfer, TransferInfo)) -getNextTransfer q dstatus acceptable = atomically $ do - sz <- readTVar (queuesize q) - if sz < 1 - then retry -- blocks until queuesize changes - else do - (r@(t,info):rest) <- readTVar (queuelist q) - writeTVar (queuelist q) rest - void $ modifyTVar' (queuesize q) pred - if acceptable info - then do - adjustTransfersSTM dstatus $ - M.insertWith' const t info - return $ Just r - else return Nothing +getNextTransfer :: (TransferInfo -> Bool) -> Assistant (Maybe (Transfer, TransferInfo)) +getNextTransfer acceptable = do + q <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ atomically $ do + sz <- readTVar (queuesize q) + if sz < 1 + then retry -- blocks until queuesize changes + else do + (r@(t,info):rest) <- readTVar (queuelist q) + writeTVar (queuelist q) rest + void $ modifyTVar' (queuesize q) pred + if acceptable info + then do + adjustTransfersSTM dstatus $ + M.insertWith' const t info + return $ Just r + else return Nothing {- Moves transfers matching a condition from the queue, to the - currentTransfers map. -} -getMatchingTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)] -getMatchingTransfers q dstatus c = atomically $ do - ts <- dequeueTransfersSTM q c - unless (null ts) $ - adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts - return ts +getMatchingTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)] +getMatchingTransfers c = do + q <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ atomically $ do + ts <- dequeueTransfersSTM q c + unless (null ts) $ + adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts + return ts {- Removes transfers matching a condition from the queue, and returns the - removed transfers. -} -dequeueTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)] -dequeueTransfers q dstatus c = do - removed <- atomically $ dequeueTransfersSTM q c +dequeueTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)] +dequeueTransfers c = do + q <- getAssistant transferQueue + removed <- liftIO $ atomically $ dequeueTransfersSTM q c unless (null removed) $ - notifyTransferOld dstatus + notifyTransfer return removed dequeueTransfersSTM :: TransferQueue -> (Transfer -> Bool) -> STM [(Transfer, TransferInfo)] diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 8afd23a12..80a062e36 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -17,7 +17,7 @@ import qualified Control.Exception as E import Control.Concurrent import qualified Control.Concurrent.MSemN as MSemN -type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, IO ())) +type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Assistant ())) {- Waits until a transfer slot becomes available, then runs a - TransferGenerator, and then runs the transfer action in its own thread. @@ -44,26 +44,30 @@ inImmediateTransferSlot gen = do - then pausing the thread until a ResumeTransfer exception is raised, - then rerunning the action. -} -runTransferThread :: Maybe (Transfer, TransferInfo, IO ()) -> Assistant () +runTransferThread :: Maybe (Transfer, TransferInfo, Assistant ()) -> Assistant () runTransferThread Nothing = flip MSemN.signal 1 <<~ transferSlots runTransferThread (Just (t, info, a)) = do d <- getAssistant id - tid <- liftIO $ forkIO $ go d + aio <- asIO a + tid <- liftIO $ forkIO $ runTransferThread' d aio updateTransferInfo t $ info { transferTid = Just tid } + +runTransferThread' :: AssistantData -> IO () -> IO () +runTransferThread' d a = go where - go d = catchPauseResume d a - pause d = catchPauseResume d $ runEvery (Seconds 86400) noop + go = catchPauseResume a + pause = catchPauseResume $ runEvery (Seconds 86400) noop {- Note: This must use E.try, rather than E.catch. - When E.catch is used, and has called go in its exception - handler, Control.Concurrent.throwTo will block sometimes - when signaling. Using E.try avoids the problem. -} - catchPauseResume d a' = do + catchPauseResume a' = do r <- E.try a' :: IO (Either E.SomeException ()) case r of Left e -> case E.fromException e of - Just PauseTransfer -> pause d - Just ResumeTransfer -> go d - _ -> done d - _ -> done d - done d = flip runAssistant d $ + Just PauseTransfer -> pause + Just ResumeTransfer -> go + _ -> done + _ -> done + done = flip runAssistant d $ flip MSemN.signal 1 <<~ transferSlots |