summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/Threads/Committer.hs2
-rw-r--r--Assistant/Threads/TransferScanner.hs13
-rw-r--r--Assistant/Threads/Watcher.hs2
-rw-r--r--Assistant/TransferQueue.hs60
4 files changed, 52 insertions, 25 deletions
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index ffb249404..33b92c7e5 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -161,7 +161,7 @@ handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
- queueTransfers transferqueue dstatus key (Just file) Upload
+ queueTransfers Next transferqueue dstatus key (Just file) Upload
showEndOk
return $ Just change
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index 485506e7d..3c2e8dfab 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -18,16 +18,23 @@ import Utility.ThreadScheduler
thisThread :: ThreadName
thisThread = "TransferScanner"
-{- This thread scans remotes, to find transfers that need to be made to
- - keep their data in sync. The transfers are queued with low priority. -}
+{- This thread waits until a remote needs to be scanned, to find transfers
+ - that need to be made, to keep data in sync.
+ -
+ - Remotes are scanned in the background; the scan is blocked when the
+ - transfer queue gets too large.
+ -}
transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO ()
transferScannerThread st scanremotes transferqueue = do
runEvery (Seconds 2) $ do
r <- getScanRemote scanremotes
needtransfer <- scan st r
forM_ needtransfer $ \(f, t) ->
- queueLaterTransfer transferqueue f t
+ queueTransferAt smallsize Later transferqueue f t
+ where
+ smallsize = 10
+{- -}
scan :: ThreadState -> Remote -> IO [(AssociatedFile, Transfer)]
scan st r = do
debug thisThread ["scanning", show r]
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index 617e6d77c..31025361b 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -206,7 +206,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
- try to get the key's content. -}
checkcontent key daemonstatus
| scanComplete daemonstatus = unlessM (inAnnex key) $
- queueTransfers transferqueue dstatus
+ queueTransfers Next transferqueue dstatus
key (Just file) Download
| otherwise = noop
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index f8104914c..1fb0bfa37 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -15,10 +15,18 @@ import qualified Remote
import Control.Concurrent.STM
-type TransferQueue = TChan (Transfer, TransferInfo)
+{- The transfer queue consists of a channel listing the transfers to make;
+ - the size of the queue is also tracked -}
+data TransferQueue = TransferQueue
+ { queue :: TChan (Transfer, TransferInfo)
+ , queuesize :: TVar Integer
+ }
+
+data Schedule = Next | Later
+ deriving (Eq)
newTransferQueue :: IO TransferQueue
-newTransferQueue = atomically newTChan
+newTransferQueue = atomically $ TransferQueue <$> newTChan <*> newTVar 0
stubInfo :: AssociatedFile -> TransferInfo
stubInfo f = TransferInfo
@@ -30,13 +38,11 @@ stubInfo f = TransferInfo
, associatedFile = f
}
-{- Adds pending transfers to the end of the queue for some of the known
- - remotes. -}
-queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
-queueTransfers q daemonstatus k f direction = do
+{- Adds pending transfers to queue for some of the known remotes. -}
+queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
+queueTransfers schedule q daemonstatus k f direction = do
rs <- knownRemotes <$> getDaemonStatus daemonstatus
- mapM_ (\r -> queue r $ gentransfer r)
- =<< sufficientremotes rs
+ mapM_ go =<< sufficientremotes rs
where
sufficientremotes rs
-- Queue downloads from all remotes that
@@ -56,20 +62,34 @@ queueTransfers q daemonstatus k f direction = do
, transferKey = k
, transferUUID = Remote.uuid r
}
- queue r t = liftIO $ void $ atomically $ do
+ go r = liftIO $ atomically $ do
let info = (stubInfo f) { transferRemote = Just r }
- writeTChan q (t, info)
+ enqueue schedule q (gentransfer r) info
+
+enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM ()
+enqueue schedule q t info
+ | schedule == Next = go unGetTChan
+ | otherwise = go writeTChan
+ where
+ go a = do
+ void $ a (queue q) (t, info)
+ void $ modifyTVar' (queuesize q) succ
-{- Adds a transfer to the end of the queue, to be processed later. -}
-queueLaterTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
-queueLaterTransfer q f t = void $ atomically $
- writeTChan q (t, stubInfo f)
+{- Adds a transfer to the queue. -}
+queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueTransfer schedule q f t = atomically $ enqueue schedule q t (stubInfo f)
-{- Adds a transfer to the start of the queue, to be processed next. -}
-queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
-queueNextTransfer q f t = void $ atomically $
- unGetTChan q (t, stubInfo f)
+{- Blocks until the queue is no larger than a given size, and then adds a
+ - transfer to the queue. -}
+queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueTransferAt wantsz schedule q f t = atomically $ do
+ sz <- readTVar (queuesize q)
+ if sz <= wantsz
+ then enqueue schedule q t (stubInfo f)
+ else retry -- blocks until queuesize changes
-{- Blocks until a pending transfer is available in the queue. -}
+{- Blocks until a pending transfer is available from the queue. -}
getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)
-getNextTransfer = atomically . readTChan
+getNextTransfer q = atomically $ do
+ void $ modifyTVar' (queuesize q) pred
+ readTChan (queue q)