summaryrefslogtreecommitdiff
path: root/Annex
diff options
context:
space:
mode:
Diffstat (limited to 'Annex')
-rw-r--r--Annex/Transfer.hs61
1 files changed, 59 insertions, 2 deletions
diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs
index a78d82ef3..6ed8ca761 100644
--- a/Annex/Transfer.hs
+++ b/Annex/Transfer.hs
@@ -1,11 +1,11 @@
{- git-annex transfers
-
- - Copyright 2012-2014 Joey Hess <id@joeyh.name>
+ - Copyright 2012-2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE CPP, FlexibleInstances #-}
+{-# LANGUAGE CPP, FlexibleInstances, BangPatterns #-}
module Annex.Transfer (
module X,
@@ -15,9 +15,11 @@ module Annex.Transfer (
alwaysRunTransfer,
noRetry,
forwardRetry,
+ pickRemote,
) where
import Annex.Common
+import qualified Annex
import Logs.Transfer as X
import Types.Transfer as X
import Annex.Notification as X
@@ -25,8 +27,10 @@ import Annex.Perms
import Utility.Metered
import Annex.LockPool
import Types.Remote (Verification(..))
+import qualified Types.Remote as Remote
import Control.Concurrent
+import qualified Data.Set as S
class Observable a where
observeBool :: a -> Bool
@@ -166,3 +170,56 @@ noRetry _ _ = False
- to send some data. -}
forwardRetry :: RetryDecider
forwardRetry old new = bytesComplete old < bytesComplete new
+
+{- Picks a remote from the list and tries a transfer to it. If the transfer
+ - does not succeed, goes on to try other remotes from the list.
+ -
+ - The list should already be ordered by remote cost, and is normally
+ - tried in order. However, when concurrent jobs are running, they will
+ - be assigned different remotes of the same cost when possible. This can
+ - increase total transfer speed.
+ -}
+pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v
+pickRemote l a = go l =<< Annex.getState Annex.concurrentjobs
+ where
+ go [] _ = return observeFailure
+ go (r:[]) _ = a r
+ go rs (Just n) | n > 1 = do
+ mv <- Annex.getState Annex.activeremotes
+ active <- liftIO $ takeMVar mv
+ let rs' = sortBy (inactiveFirst active) rs
+ goconcurrent mv active rs'
+ go (r:rs) _ = do
+ ok <- a r
+ if observeBool ok
+ then return ok
+ else go rs Nothing
+ goconcurrent mv active [] = do
+ liftIO $ putMVar mv active
+ return observeFailure
+ goconcurrent mv active (r:rs) = do
+ let !active' = S.insert r active
+ liftIO $ putMVar mv active'
+ let getnewactive = do
+ active'' <- liftIO $ takeMVar mv
+ let !active''' = S.delete r active''
+ return active'''
+ let removeactive = liftIO . putMVar mv =<< getnewactive
+ ok <- a r `onException` removeactive
+ if observeBool ok
+ then do
+ removeactive
+ return ok
+ else do
+ active'' <- getnewactive
+ -- Re-sort the remaining rs
+ -- because other threads could have
+ -- been assigned them in the meantime.
+ let rs' = sortBy (inactiveFirst active'') rs
+ goconcurrent mv active'' rs'
+
+inactiveFirst :: S.Set Remote -> Remote -> Remote -> Ordering
+inactiveFirst active a b
+ | Remote.cost a == Remote.cost b =
+ if a `S.member` active then GT else LT
+ | otherwise = compare a b