diff options
Diffstat (limited to 'Annex')
-rw-r--r-- | Annex/Transfer.hs | 61 |
1 files changed, 59 insertions, 2 deletions
diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index a78d82ef3..6ed8ca761 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -1,11 +1,11 @@ {- git-annex transfers - - - Copyright 2012-2014 Joey Hess <id@joeyh.name> + - Copyright 2012-2016 Joey Hess <id@joeyh.name> - - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE CPP, FlexibleInstances #-} +{-# LANGUAGE CPP, FlexibleInstances, BangPatterns #-} module Annex.Transfer ( module X, @@ -15,9 +15,11 @@ module Annex.Transfer ( alwaysRunTransfer, noRetry, forwardRetry, + pickRemote, ) where import Annex.Common +import qualified Annex import Logs.Transfer as X import Types.Transfer as X import Annex.Notification as X @@ -25,8 +27,10 @@ import Annex.Perms import Utility.Metered import Annex.LockPool import Types.Remote (Verification(..)) +import qualified Types.Remote as Remote import Control.Concurrent +import qualified Data.Set as S class Observable a where observeBool :: a -> Bool @@ -166,3 +170,56 @@ noRetry _ _ = False - to send some data. -} forwardRetry :: RetryDecider forwardRetry old new = bytesComplete old < bytesComplete new + +{- Picks a remote from the list and tries a transfer to it. If the transfer + - does not succeed, goes on to try other remotes from the list. + - + - The list should already be ordered by remote cost, and is normally + - tried in order. However, when concurrent jobs are running, they will + - be assigned different remotes of the same cost when possible. This can + - increase total transfer speed. + -} +pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v +pickRemote l a = go l =<< Annex.getState Annex.concurrentjobs + where + go [] _ = return observeFailure + go (r:[]) _ = a r + go rs (Just n) | n > 1 = do + mv <- Annex.getState Annex.activeremotes + active <- liftIO $ takeMVar mv + let rs' = sortBy (inactiveFirst active) rs + goconcurrent mv active rs' + go (r:rs) _ = do + ok <- a r + if observeBool ok + then return ok + else go rs Nothing + goconcurrent mv active [] = do + liftIO $ putMVar mv active + return observeFailure + goconcurrent mv active (r:rs) = do + let !active' = S.insert r active + liftIO $ putMVar mv active' + let getnewactive = do + active'' <- liftIO $ takeMVar mv + let !active''' = S.delete r active'' + return active''' + let removeactive = liftIO . putMVar mv =<< getnewactive + ok <- a r `onException` removeactive + if observeBool ok + then do + removeactive + return ok + else do + active'' <- getnewactive + -- Re-sort the remaining rs + -- because other threads could have + -- been assigned them in the meantime. + let rs' = sortBy (inactiveFirst active'') rs + goconcurrent mv active'' rs' + +inactiveFirst :: S.Set Remote -> Remote -> Remote -> Ordering +inactiveFirst active a b + | Remote.cost a == Remote.cost b = + if a `S.member` active then GT else LT + | otherwise = compare a b |