summaryrefslogtreecommitdiff
path: root/Assistant/TransferQueue.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/TransferQueue.hs')
-rw-r--r--Assistant/TransferQueue.hs149
1 files changed, 149 insertions, 0 deletions
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
new file mode 100644
index 000000000..aa6192527
--- /dev/null
+++ b/Assistant/TransferQueue.hs
@@ -0,0 +1,149 @@
+{- git-annex assistant pending transfer queue
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.TransferQueue (
+ TransferQueue,
+ Schedule(..),
+ newTransferQueue,
+ getTransferQueue,
+ queueTransfers,
+ queueTransfer,
+ queueTransferAt,
+ getNextTransfer,
+ dequeueTransfer,
+) where
+
+import Common.Annex
+import Assistant.DaemonStatus
+import Logs.Transfer
+import Types.Remote
+import qualified Remote
+
+import Control.Concurrent.STM
+import qualified Data.Map as M
+
+{- The transfer queue consists of a channel listing the transfers to make;
+ - the size of the queue is also tracked, and a list is maintained
+ - in parallel to allow for reading. -}
+data TransferQueue = TransferQueue
+ { queue :: TChan (Transfer, TransferInfo)
+ , queuesize :: TVar Int
+ , queuelist :: TVar [(Transfer, TransferInfo)]
+ }
+
+data Schedule = Next | Later
+ deriving (Eq)
+
+newTransferQueue :: IO TransferQueue
+newTransferQueue = atomically $ TransferQueue
+ <$> newTChan
+ <*> newTVar 0
+ <*> newTVar []
+
+{- Reads the queue's content without blocking or changing it. -}
+getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)]
+getTransferQueue q = atomically $ readTVar $ queuelist q
+
+stubInfo :: AssociatedFile -> Remote -> TransferInfo
+stubInfo f r = TransferInfo
+ { startedTime = Nothing
+ , transferPid = Nothing
+ , transferTid = Nothing
+ , transferRemote = Just r
+ , bytesComplete = Nothing
+ , associatedFile = f
+ , transferPaused = False
+ }
+
+{- Adds transfers to queue for some of the known remotes. -}
+queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
+queueTransfers schedule q dstatus k f direction = do
+ rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
+ mapM_ go =<< sufficientremotes rs
+ where
+ sufficientremotes rs
+ -- Queue downloads from all remotes that
+ -- have the key, with the cheapest ones first.
+ -- More expensive ones will only be tried if
+ -- downloading from a cheap one fails.
+ | direction == Download = do
+ uuids <- Remote.keyLocations k
+ return $ filter (\r -> uuid r `elem` uuids) rs
+ -- TODO: Determine a smaller set of remotes that
+ -- can be uploaded to, in order to ensure all
+ -- remotes can access the content. Currently,
+ -- send to every remote we can.
+ | otherwise = return rs
+ gentransfer r = Transfer
+ { transferDirection = direction
+ , transferKey = k
+ , transferUUID = Remote.uuid r
+ }
+ go r = liftIO $
+ enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
+
+enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
+enqueue schedule q dstatus t info
+ | schedule == Next = go unGetTChan (new:)
+ | otherwise = go writeTChan (\l -> l++[new])
+ where
+ new = (t, info)
+ go modqueue modlist = do
+ atomically $ do
+ void $ modqueue (queue q) new
+ void $ modifyTVar' (queuesize q) succ
+ void $ modifyTVar' (queuelist q) modlist
+ void $ notifyTransfer dstatus
+
+{- Adds a transfer to the queue. -}
+queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransfer schedule q dstatus f t remote =
+ enqueue schedule q dstatus t (stubInfo f remote)
+
+{- Blocks until the queue is no larger than a given size, and then adds a
+ - transfer to the queue. -}
+queueTransferAt :: Int -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransferAt wantsz schedule q dstatus f t remote = do
+ atomically $ do
+ sz <- readTVar (queuesize q)
+ if sz <= wantsz
+ then return ()
+ else retry -- blocks until queuesize changes
+ enqueue schedule q dstatus t (stubInfo f remote)
+
+{- Blocks until a pending transfer is available from the queue,
+ - and removes it.
+ -
+ - Checks that it's acceptable, before adding it to the
+ - the currentTransfers map. If it's not acceptable, it's discarded.
+ -
+ - This is done in a single STM transaction, so there is no window
+ - where an observer sees an inconsistent status. -}
+getNextTransfer :: TransferQueue -> DaemonStatusHandle -> (TransferInfo -> Bool) -> IO (Maybe (Transfer, TransferInfo))
+getNextTransfer q dstatus acceptable = atomically $ do
+ void $ modifyTVar' (queuesize q) pred
+ void $ modifyTVar' (queuelist q) (drop 1)
+ r@(t, info) <- readTChan (queue q)
+ if acceptable info
+ then do
+ adjustTransfersSTM dstatus $
+ M.insertWith' const t info
+ return $ Just r
+ else return Nothing
+
+{- Removes a transfer from the queue, if present, and returns True if it
+ - was present. -}
+dequeueTransfer :: TransferQueue -> DaemonStatusHandle -> Transfer -> IO Bool
+dequeueTransfer q dstatus t = do
+ ok <- atomically $ do
+ (l, removed) <- partition (\i -> fst i /= t) <$> readTVar (queuelist q)
+ void $ writeTVar (queuesize q) (length l)
+ void $ writeTVar (queuelist q) l
+ return $ not $ null removed
+ when ok $
+ notifyTransfer dstatus
+ return ok