diff options
author | Joey Hess <joey@kitenet.net> | 2012-07-25 13:12:34 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-07-25 13:12:34 -0400 |
commit | a9dbfdf28d6c97c636e58be85f68d2a3f6efef77 (patch) | |
tree | 667c010c9a933535a37d824b424fcae0c2bc35d4 /Assistant/TransferQueue.hs | |
parent | 6107328a6b981ec8130e4154be1ebe7bc11979df (diff) |
better transfer queue management
Allow transfers to be added with blocking until the queue is sufficiently
small.
Better control over which end of the queue to add a transfer to.
Diffstat (limited to 'Assistant/TransferQueue.hs')
-rw-r--r-- | Assistant/TransferQueue.hs | 60 |
1 files changed, 40 insertions, 20 deletions
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index f8104914c..1fb0bfa37 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -15,10 +15,18 @@ import qualified Remote import Control.Concurrent.STM -type TransferQueue = TChan (Transfer, TransferInfo) +{- The transfer queue consists of a channel listing the transfers to make; + - the size of the queue is also tracked -} +data TransferQueue = TransferQueue + { queue :: TChan (Transfer, TransferInfo) + , queuesize :: TVar Integer + } + +data Schedule = Next | Later + deriving (Eq) newTransferQueue :: IO TransferQueue -newTransferQueue = atomically newTChan +newTransferQueue = atomically $ TransferQueue <$> newTChan <*> newTVar 0 stubInfo :: AssociatedFile -> TransferInfo stubInfo f = TransferInfo @@ -30,13 +38,11 @@ stubInfo f = TransferInfo , associatedFile = f } -{- Adds pending transfers to the end of the queue for some of the known - - remotes. -} -queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () -queueTransfers q daemonstatus k f direction = do +{- Adds pending transfers to queue for some of the known remotes. -} +queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () +queueTransfers schedule q daemonstatus k f direction = do rs <- knownRemotes <$> getDaemonStatus daemonstatus - mapM_ (\r -> queue r $ gentransfer r) - =<< sufficientremotes rs + mapM_ go =<< sufficientremotes rs where sufficientremotes rs -- Queue downloads from all remotes that @@ -56,20 +62,34 @@ queueTransfers q daemonstatus k f direction = do , transferKey = k , transferUUID = Remote.uuid r } - queue r t = liftIO $ void $ atomically $ do + go r = liftIO $ atomically $ do let info = (stubInfo f) { transferRemote = Just r } - writeTChan q (t, info) + enqueue schedule q (gentransfer r) info + +enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM () +enqueue schedule q t info + | schedule == Next = go unGetTChan + | otherwise = go writeTChan + where + go a = do + void $ a (queue q) (t, info) + void $ modifyTVar' (queuesize q) succ -{- Adds a transfer to the end of the queue, to be processed later. -} -queueLaterTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () -queueLaterTransfer q f t = void $ atomically $ - writeTChan q (t, stubInfo f) +{- Adds a transfer to the queue. -} +queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> IO () +queueTransfer schedule q f t = atomically $ enqueue schedule q t (stubInfo f) -{- Adds a transfer to the start of the queue, to be processed next. -} -queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () -queueNextTransfer q f t = void $ atomically $ - unGetTChan q (t, stubInfo f) +{- Blocks until the queue is no larger than a given size, and then adds a + - transfer to the queue. -} +queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> IO () +queueTransferAt wantsz schedule q f t = atomically $ do + sz <- readTVar (queuesize q) + if sz <= wantsz + then enqueue schedule q t (stubInfo f) + else retry -- blocks until queuesize changes -{- Blocks until a pending transfer is available in the queue. -} +{- Blocks until a pending transfer is available from the queue. -} getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo) -getNextTransfer = atomically . readTChan +getNextTransfer q = atomically $ do + void $ modifyTVar' (queuesize q) pred + readTChan (queue q) |