diff options
Diffstat (limited to 'Assistant/TransferQueue.hs')
-rw-r--r-- | Assistant/TransferQueue.hs | 209 |
1 files changed, 109 insertions, 100 deletions
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index 11c2d58d8..8e403cc43 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -21,9 +21,8 @@ module Assistant.TransferQueue ( dequeueTransfers, ) where -import Common.Annex +import Assistant.Common import Assistant.DaemonStatus -import Assistant.Types.DaemonStatus import Assistant.Types.TransferQueue import Logs.Transfer import Types.Remote @@ -35,8 +34,8 @@ import Control.Concurrent.STM import qualified Data.Map as M {- Reads the queue's content without blocking or changing it. -} -getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)] -getTransferQueue q = atomically $ readTVar $ queuelist q +getTransferQueue :: Assistant [(Transfer, TransferInfo)] +getTransferQueue = (atomically . readTVar . queuelist) <<~ transferQueue stubInfo :: AssociatedFile -> Remote -> TransferInfo stubInfo f r = stubTransferInfo @@ -46,101 +45,104 @@ stubInfo f r = stubTransferInfo {- Adds transfers to queue for some of the known remotes. - Honors preferred content settings, only transferring wanted files. -} -queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () +queueTransfers :: Schedule -> Key -> AssociatedFile -> Direction -> Assistant () queueTransfers = queueTransfersMatching (const True) {- Adds transfers to queue for some of the known remotes, that match a - condition. Honors preferred content settings. -} -queueTransfersMatching :: (UUID -> Bool) -> Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () -queueTransfersMatching matching schedule q dstatus k f direction - | direction == Download = whenM (wantGet f) go +queueTransfersMatching :: (UUID -> Bool) -> Schedule -> Key -> AssociatedFile -> Direction -> Assistant () +queueTransfersMatching matching schedule k f direction + | direction == Download = whenM (liftAnnex $ wantGet f) go | otherwise = go - where - go = do - rs <- sufficientremotes - =<< syncRemotes <$> liftIO (getDaemonStatusOld dstatus) - let matchingrs = filter (matching . Remote.uuid) rs - if null matchingrs - then defer - else forM_ matchingrs $ \r -> liftIO $ - enqueue schedule q dstatus (gentransfer r) (stubInfo f r) - sufficientremotes rs - {- Queue downloads from all remotes that - - have the key, with the cheapest ones first. - - More expensive ones will only be tried if - - downloading from a cheap one fails. -} - | direction == Download = do - uuids <- Remote.keyLocations k - return $ filter (\r -> uuid r `elem` uuids) rs - {- Upload to all remotes that want the content. -} - | otherwise = filterM (wantSend f . Remote.uuid) $ - filter (not . Remote.readonly) rs - gentransfer r = Transfer - { transferDirection = direction - , transferKey = k - , transferUUID = Remote.uuid r - } - defer - {- Defer this download, as no known remote has the key. -} - | direction == Download = void $ liftIO $ atomically $ - modifyTVar' (deferreddownloads q) $ - \l -> (k, f):l - | otherwise = noop + where + go = do + rs <- liftAnnex . sufficientremotes + =<< syncRemotes <$> getDaemonStatus + let matchingrs = filter (matching . Remote.uuid) rs + if null matchingrs + then defer + else forM_ matchingrs $ \r -> + enqueue schedule (gentransfer r) (stubInfo f r) + sufficientremotes rs + {- Queue downloads from all remotes that + - have the key, with the cheapest ones first. + - More expensive ones will only be tried if + - downloading from a cheap one fails. -} + | direction == Download = do + uuids <- Remote.keyLocations k + return $ filter (\r -> uuid r `elem` uuids) rs + {- Upload to all remotes that want the content. -} + | otherwise = filterM (wantSend f . Remote.uuid) $ + filter (not . Remote.readonly) rs + gentransfer r = Transfer + { transferDirection = direction + , transferKey = k + , transferUUID = Remote.uuid r + } + defer + {- Defer this download, as no known remote has the key. -} + | direction == Download = do + q <- getAssistant transferQueue + void $ liftIO $ atomically $ + modifyTVar' (deferreddownloads q) $ + \l -> (k, f):l + | otherwise = noop {- Queues any deferred downloads that can now be accomplished, leaving - any others in the list to try again later. -} -queueDeferredDownloads :: Schedule -> TransferQueue -> DaemonStatusHandle -> Annex () -queueDeferredDownloads schedule q dstatus = do +queueDeferredDownloads :: Schedule -> Assistant () +queueDeferredDownloads schedule = do + q <- getAssistant transferQueue l <- liftIO $ atomically $ swapTVar (deferreddownloads q) [] - rs <- syncRemotes <$> liftIO (getDaemonStatusOld dstatus) + rs <- syncRemotes <$> getDaemonStatus left <- filterM (queue rs) l unless (null left) $ liftIO $ atomically $ modifyTVar' (deferreddownloads q) $ \new -> new ++ left - where - queue rs (k, f) = do - uuids <- Remote.keyLocations k - let sources = filter (\r -> uuid r `elem` uuids) rs - unless (null sources) $ - forM_ sources $ \r -> liftIO $ - enqueue schedule q dstatus - (gentransfer r) (stubInfo f r) - return $ null sources - where - gentransfer r = Transfer - { transferDirection = Download - , transferKey = k - , transferUUID = Remote.uuid r - } - -enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO () -enqueue schedule q dstatus t info + where + queue rs (k, f) = do + uuids <- liftAnnex $ Remote.keyLocations k + let sources = filter (\r -> uuid r `elem` uuids) rs + unless (null sources) $ + forM_ sources $ \r -> + enqueue schedule (gentransfer r) (stubInfo f r) + return $ null sources + where + gentransfer r = Transfer + { transferDirection = Download + , transferKey = k + , transferUUID = Remote.uuid r + } + +enqueue :: Schedule -> Transfer -> TransferInfo -> Assistant () +enqueue schedule t info | schedule == Next = go (new:) | otherwise = go (\l -> l++[new]) - where - new = (t, info) - go modlist = do - atomically $ do - void $ modifyTVar' (queuesize q) succ - void $ modifyTVar' (queuelist q) modlist - void $ notifyTransferOld dstatus + where + new = (t, info) + go modlist = do + q <- getAssistant transferQueue + liftIO $ atomically $ do + void $ modifyTVar' (queuesize q) succ + void $ modifyTVar' (queuelist q) modlist + notifyTransfer {- Adds a transfer to the queue. -} -queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () -queueTransfer schedule q dstatus f t remote = - enqueue schedule q dstatus t (stubInfo f remote) +queueTransfer :: Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant () +queueTransfer schedule f t remote = enqueue schedule t (stubInfo f remote) {- Blocks until the queue is no larger than a given size, and then adds a - transfer to the queue. -} -queueTransferAt :: Int -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () -queueTransferAt wantsz schedule q dstatus f t remote = do - atomically $ do +queueTransferAt :: Int -> Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant () +queueTransferAt wantsz schedule f t remote = do + q <- getAssistant transferQueue + liftIO $ atomically $ do sz <- readTVar (queuesize q) unless (sz <= wantsz) $ retry -- blocks until queuesize changes - enqueue schedule q dstatus t (stubInfo f remote) + enqueue schedule t (stubInfo f remote) -queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () +queueTransferWhenSmall :: AssociatedFile -> Transfer -> Remote -> Assistant () queueTransferWhenSmall = queueTransferAt 10 Later {- Blocks until a pending transfer is available in the queue, @@ -151,38 +153,45 @@ queueTransferWhenSmall = queueTransferAt 10 Later - - This is done in a single STM transaction, so there is no window - where an observer sees an inconsistent status. -} -getNextTransfer :: TransferQueue -> DaemonStatusHandle -> (TransferInfo -> Bool) -> IO (Maybe (Transfer, TransferInfo)) -getNextTransfer q dstatus acceptable = atomically $ do - sz <- readTVar (queuesize q) - if sz < 1 - then retry -- blocks until queuesize changes - else do - (r@(t,info):rest) <- readTVar (queuelist q) - writeTVar (queuelist q) rest - void $ modifyTVar' (queuesize q) pred - if acceptable info - then do - adjustTransfersSTM dstatus $ - M.insertWith' const t info - return $ Just r - else return Nothing +getNextTransfer :: (TransferInfo -> Bool) -> Assistant (Maybe (Transfer, TransferInfo)) +getNextTransfer acceptable = do + q <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ atomically $ do + sz <- readTVar (queuesize q) + if sz < 1 + then retry -- blocks until queuesize changes + else do + (r@(t,info):rest) <- readTVar (queuelist q) + writeTVar (queuelist q) rest + void $ modifyTVar' (queuesize q) pred + if acceptable info + then do + adjustTransfersSTM dstatus $ + M.insertWith' const t info + return $ Just r + else return Nothing {- Moves transfers matching a condition from the queue, to the - currentTransfers map. -} -getMatchingTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)] -getMatchingTransfers q dstatus c = atomically $ do - ts <- dequeueTransfersSTM q c - unless (null ts) $ - adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts - return ts +getMatchingTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)] +getMatchingTransfers c = do + q <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ atomically $ do + ts <- dequeueTransfersSTM q c + unless (null ts) $ + adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts + return ts {- Removes transfers matching a condition from the queue, and returns the - removed transfers. -} -dequeueTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)] -dequeueTransfers q dstatus c = do - removed <- atomically $ dequeueTransfersSTM q c +dequeueTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)] +dequeueTransfers c = do + q <- getAssistant transferQueue + removed <- liftIO $ atomically $ dequeueTransfersSTM q c unless (null removed) $ - notifyTransferOld dstatus + notifyTransfer return removed dequeueTransfersSTM :: TransferQueue -> (Transfer -> Bool) -> STM [(Transfer, TransferInfo)] |