From a9dbfdf28d6c97c636e58be85f68d2a3f6efef77 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 25 Jul 2012 13:12:34 -0400 Subject: better transfer queue management Allow transfers to be added with blocking until the queue is sufficiently small. Better control over which end of the queue to add a transfer to. --- Assistant/Threads/Committer.hs | 2 +- Assistant/Threads/TransferScanner.hs | 13 ++++++-- Assistant/Threads/Watcher.hs | 2 +- Assistant/TransferQueue.hs | 60 ++++++++++++++++++++++++------------ 4 files changed, 52 insertions(+), 25 deletions(-) (limited to 'Assistant') diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index ffb249404..33b92c7e5 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -161,7 +161,7 @@ handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds sha <- inRepo $ Git.HashObject.hashObject BlobObject link stageSymlink file sha - queueTransfers transferqueue dstatus key (Just file) Upload + queueTransfers Next transferqueue dstatus key (Just file) Upload showEndOk return $ Just change diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index 485506e7d..3c2e8dfab 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -18,16 +18,23 @@ import Utility.ThreadScheduler thisThread :: ThreadName thisThread = "TransferScanner" -{- This thread scans remotes, to find transfers that need to be made to - - keep their data in sync. The transfers are queued with low priority. -} +{- This thread waits until a remote needs to be scanned, to find transfers + - that need to be made, to keep data in sync. + - + - Remotes are scanned in the background; the scan is blocked when the + - transfer queue gets too large. + -} transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO () transferScannerThread st scanremotes transferqueue = do runEvery (Seconds 2) $ do r <- getScanRemote scanremotes needtransfer <- scan st r forM_ needtransfer $ \(f, t) -> - queueLaterTransfer transferqueue f t + queueTransferAt smallsize Later transferqueue f t + where + smallsize = 10 +{- -} scan :: ThreadState -> Remote -> IO [(AssociatedFile, Transfer)] scan st r = do debug thisThread ["scanning", show r] diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 617e6d77c..31025361b 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -206,7 +206,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l - try to get the key's content. -} checkcontent key daemonstatus | scanComplete daemonstatus = unlessM (inAnnex key) $ - queueTransfers transferqueue dstatus + queueTransfers Next transferqueue dstatus key (Just file) Download | otherwise = noop diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index f8104914c..1fb0bfa37 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -15,10 +15,18 @@ import qualified Remote import Control.Concurrent.STM -type TransferQueue = TChan (Transfer, TransferInfo) +{- The transfer queue consists of a channel listing the transfers to make; + - the size of the queue is also tracked -} +data TransferQueue = TransferQueue + { queue :: TChan (Transfer, TransferInfo) + , queuesize :: TVar Integer + } + +data Schedule = Next | Later + deriving (Eq) newTransferQueue :: IO TransferQueue -newTransferQueue = atomically newTChan +newTransferQueue = atomically $ TransferQueue <$> newTChan <*> newTVar 0 stubInfo :: AssociatedFile -> TransferInfo stubInfo f = TransferInfo @@ -30,13 +38,11 @@ stubInfo f = TransferInfo , associatedFile = f } -{- Adds pending transfers to the end of the queue for some of the known - - remotes. -} -queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () -queueTransfers q daemonstatus k f direction = do +{- Adds pending transfers to queue for some of the known remotes. -} +queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () +queueTransfers schedule q daemonstatus k f direction = do rs <- knownRemotes <$> getDaemonStatus daemonstatus - mapM_ (\r -> queue r $ gentransfer r) - =<< sufficientremotes rs + mapM_ go =<< sufficientremotes rs where sufficientremotes rs -- Queue downloads from all remotes that @@ -56,20 +62,34 @@ queueTransfers q daemonstatus k f direction = do , transferKey = k , transferUUID = Remote.uuid r } - queue r t = liftIO $ void $ atomically $ do + go r = liftIO $ atomically $ do let info = (stubInfo f) { transferRemote = Just r } - writeTChan q (t, info) + enqueue schedule q (gentransfer r) info + +enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM () +enqueue schedule q t info + | schedule == Next = go unGetTChan + | otherwise = go writeTChan + where + go a = do + void $ a (queue q) (t, info) + void $ modifyTVar' (queuesize q) succ -{- Adds a transfer to the end of the queue, to be processed later. -} -queueLaterTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () -queueLaterTransfer q f t = void $ atomically $ - writeTChan q (t, stubInfo f) +{- Adds a transfer to the queue. -} +queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> IO () +queueTransfer schedule q f t = atomically $ enqueue schedule q t (stubInfo f) -{- Adds a transfer to the start of the queue, to be processed next. -} -queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () -queueNextTransfer q f t = void $ atomically $ - unGetTChan q (t, stubInfo f) +{- Blocks until the queue is no larger than a given size, and then adds a + - transfer to the queue. -} +queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> IO () +queueTransferAt wantsz schedule q f t = atomically $ do + sz <- readTVar (queuesize q) + if sz <= wantsz + then enqueue schedule q t (stubInfo f) + else retry -- blocks until queuesize changes -{- Blocks until a pending transfer is available in the queue. -} +{- Blocks until a pending transfer is available from the queue. -} getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo) -getNextTransfer = atomically . readTChan +getNextTransfer q = atomically $ do + void $ modifyTVar' (queuesize q) pred + readTChan (queue q) -- cgit v1.2.3