summaryrefslogtreecommitdiff
path: root/Assistant/TransferQueue.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/TransferQueue.hs')
-rw-r--r--Assistant/TransferQueue.hs56
1 files changed, 34 insertions, 22 deletions
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index 2f09813eb..51ed5c9c7 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -23,6 +23,7 @@ import Types.Remote
import qualified Remote
import Control.Concurrent.STM
+import qualified Data.Map as M
{- The transfer queue consists of a channel listing the transfers to make;
- the size of the queue is also tracked, and a list is maintained
@@ -58,8 +59,8 @@ stubInfo f r = TransferInfo
{- Adds transfers to queue for some of the known remotes. -}
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
-queueTransfers schedule q daemonstatus k f direction = do
- rs <- knownRemotes <$> liftIO (getDaemonStatus daemonstatus)
+queueTransfers schedule q dstatus k f direction = do
+ rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
mapM_ go =<< sufficientremotes rs
where
sufficientremotes rs
@@ -80,37 +81,48 @@ queueTransfers schedule q daemonstatus k f direction = do
, transferKey = k
, transferUUID = Remote.uuid r
}
- go r = liftIO $ atomically $
- enqueue schedule q (gentransfer r) (stubInfo f r)
+ go r = liftIO $
+ enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
-enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM ()
-enqueue schedule q t info
+enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
+enqueue schedule q dstatus t info
| schedule == Next = go unGetTChan (new:)
| otherwise = go writeTChan (\l -> l++[new])
where
new = (t, info)
go modqueue modlist = do
- void $ modqueue (queue q) new
- void $ modifyTVar' (queuesize q) succ
- void $ modifyTVar' (queuelist q) modlist
+ atomically $ do
+ void $ modqueue (queue q) new
+ void $ modifyTVar' (queuesize q) succ
+ void $ modifyTVar' (queuelist q) modlist
+ void $ notifyDaemonStatusChange dstatus
{- Adds a transfer to the queue. -}
-queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO ()
-queueTransfer schedule q f t remote = atomically $
- enqueue schedule q t (stubInfo f remote)
+queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransfer schedule q dstatus f t remote =
+ enqueue schedule q dstatus t (stubInfo f remote)
{- Blocks until the queue is no larger than a given size, and then adds a
- transfer to the queue. -}
-queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO ()
-queueTransferAt wantsz schedule q f t remote = atomically $ do
- sz <- readTVar (queuesize q)
- if sz <= wantsz
- then enqueue schedule q t (stubInfo f remote)
- else retry -- blocks until queuesize changes
+queueTransferAt :: Integer -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransferAt wantsz schedule q dstatus f t remote = do
+ atomically $ do
+ sz <- readTVar (queuesize q)
+ if sz <= wantsz
+ then return ()
+ else retry -- blocks until queuesize changes
+ enqueue schedule q dstatus t (stubInfo f remote)
-{- Blocks until a pending transfer is available from the queue. -}
-getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)
-getNextTransfer q = atomically $ do
+{- Blocks until a pending transfer is available from the queue.
+ - The transfer is removed from the transfer queue, and added to
+ - the daemon status currentTransfers map. This is done in a single STM
+ - transaction, so there is no window where an observer sees an
+ - inconsistent status. -}
+getNextTransfer :: TransferQueue -> DaemonStatusHandle -> IO (Transfer, TransferInfo)
+getNextTransfer q dstatus = atomically $ do
void $ modifyTVar' (queuesize q) pred
void $ modifyTVar' (queuelist q) (drop 1)
- readTChan (queue q)
+ r@(t, info) <- readTChan (queue q)
+ adjustTransfersSTM dstatus $
+ M.insertWith' const t info
+ return r