diff options
Diffstat (limited to 'Assistant/TransferQueue.hs')
-rw-r--r-- | Assistant/TransferQueue.hs | 56 |
1 files changed, 34 insertions, 22 deletions
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index 2f09813eb..51ed5c9c7 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -23,6 +23,7 @@ import Types.Remote import qualified Remote import Control.Concurrent.STM +import qualified Data.Map as M {- The transfer queue consists of a channel listing the transfers to make; - the size of the queue is also tracked, and a list is maintained @@ -58,8 +59,8 @@ stubInfo f r = TransferInfo {- Adds transfers to queue for some of the known remotes. -} queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () -queueTransfers schedule q daemonstatus k f direction = do - rs <- knownRemotes <$> liftIO (getDaemonStatus daemonstatus) +queueTransfers schedule q dstatus k f direction = do + rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus) mapM_ go =<< sufficientremotes rs where sufficientremotes rs @@ -80,37 +81,48 @@ queueTransfers schedule q daemonstatus k f direction = do , transferKey = k , transferUUID = Remote.uuid r } - go r = liftIO $ atomically $ - enqueue schedule q (gentransfer r) (stubInfo f r) + go r = liftIO $ + enqueue schedule q dstatus (gentransfer r) (stubInfo f r) -enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM () -enqueue schedule q t info +enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO () +enqueue schedule q dstatus t info | schedule == Next = go unGetTChan (new:) | otherwise = go writeTChan (\l -> l++[new]) where new = (t, info) go modqueue modlist = do - void $ modqueue (queue q) new - void $ modifyTVar' (queuesize q) succ - void $ modifyTVar' (queuelist q) modlist + atomically $ do + void $ modqueue (queue q) new + void $ modifyTVar' (queuesize q) succ + void $ modifyTVar' (queuelist q) modlist + void $ notifyDaemonStatusChange dstatus {- Adds a transfer to the queue. -} -queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO () -queueTransfer schedule q f t remote = atomically $ - enqueue schedule q t (stubInfo f remote) +queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () +queueTransfer schedule q dstatus f t remote = + enqueue schedule q dstatus t (stubInfo f remote) {- Blocks until the queue is no larger than a given size, and then adds a - transfer to the queue. -} -queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO () -queueTransferAt wantsz schedule q f t remote = atomically $ do - sz <- readTVar (queuesize q) - if sz <= wantsz - then enqueue schedule q t (stubInfo f remote) - else retry -- blocks until queuesize changes +queueTransferAt :: Integer -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () +queueTransferAt wantsz schedule q dstatus f t remote = do + atomically $ do + sz <- readTVar (queuesize q) + if sz <= wantsz + then return () + else retry -- blocks until queuesize changes + enqueue schedule q dstatus t (stubInfo f remote) -{- Blocks until a pending transfer is available from the queue. -} -getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo) -getNextTransfer q = atomically $ do +{- Blocks until a pending transfer is available from the queue. + - The transfer is removed from the transfer queue, and added to + - the daemon status currentTransfers map. This is done in a single STM + - transaction, so there is no window where an observer sees an + - inconsistent status. -} +getNextTransfer :: TransferQueue -> DaemonStatusHandle -> IO (Transfer, TransferInfo) +getNextTransfer q dstatus = atomically $ do void $ modifyTVar' (queuesize q) pred void $ modifyTVar' (queuelist q) (drop 1) - readTChan (queue q) + r@(t, info) <- readTChan (queue q) + adjustTransfersSTM dstatus $ + M.insertWith' const t info + return r |