aboutsummaryrefslogtreecommitdiff
path: root/Annex
diff options
context:
space:
mode:
Diffstat (limited to 'Annex')
-rw-r--r--Annex/Transfer.hs56
1 files changed, 38 insertions, 18 deletions
diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs
index 3fcf1a1b9..35294ba2b 100644
--- a/Annex/Transfer.hs
+++ b/Annex/Transfer.hs
@@ -32,7 +32,9 @@ import qualified Types.Remote as Remote
import Types.Concurrency
import Control.Concurrent
+import Control.Concurrent.STM
import qualified Data.Map.Strict as M
+import qualified Data.Set as S
import Data.Ord
class Observable a where
@@ -89,22 +91,23 @@ alwaysRunTransfer :: Observable v => Transfer -> AssociatedFile -> RetryDecider
alwaysRunTransfer = runTransfer' True
runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v
-runTransfer' ignorelock t afile shouldretry transferaction = checkSecureHashes t $ do
- info <- liftIO $ startTransferInfo afile
- (meter, tfile, metervar) <- mkProgressUpdater t info
- mode <- annexFileMode
- (lck, inprogress) <- prep tfile mode info
- if inprogress && not ignorelock
- then do
- showNote "transfer already in progress, or unable to take transfer lock"
- return observeFailure
- else do
- v <- retry info metervar $ transferaction meter
- liftIO $ cleanup tfile lck
- if observeBool v
- then removeFailedTransfer t
- else recordFailedTransfer t info
- return v
+runTransfer' ignorelock t afile shouldretry transferaction =
+ checkSecureHashes t $ currentProcessTransfer t $ do
+ info <- liftIO $ startTransferInfo afile
+ (meter, tfile, metervar) <- mkProgressUpdater t info
+ mode <- annexFileMode
+ (lck, inprogress) <- prep tfile mode info
+ if inprogress && not ignorelock
+ then do
+ showNote "transfer already in progress, or unable to take transfer lock"
+ return observeFailure
+ else do
+ v <- handleretry info metervar $ transferaction meter
+ liftIO $ cleanup tfile lck
+ if observeBool v
+ then removeFailedTransfer t
+ else recordFailedTransfer t info
+ return v
where
#ifndef mingw32_HOST_OS
prep tfile mode info = catchPermissionDenied (const prepfailed) $ do
@@ -153,7 +156,7 @@ runTransfer' ignorelock t afile shouldretry transferaction = checkSecureHashes t
dropLock lockhandle
void $ tryIO $ removeFile lck
#endif
- retry oldinfo metervar run = do
+ handleretry oldinfo metervar run = do
v <- tryNonAsync run
case v of
Right b -> return b
@@ -162,7 +165,7 @@ runTransfer' ignorelock t afile shouldretry transferaction = checkSecureHashes t
b <- getbytescomplete metervar
let newinfo = oldinfo { bytesComplete = Just b }
if shouldretry oldinfo newinfo
- then retry newinfo metervar run
+ then handleretry newinfo metervar run
else return observeFailure
getbytescomplete metervar
| transferDirection t == Upload =
@@ -256,3 +259,20 @@ lessActiveFirst :: M.Map Remote Integer -> Remote -> Remote -> Ordering
lessActiveFirst active a b
| Remote.cost a == Remote.cost b = comparing (`M.lookup` active) a b
| otherwise = compare a b
+
+{- Runs a transfer action. Only one thread can run for a given Transfer
+ - at a time; other threads will block. -}
+currentProcessTransfer :: Transfer -> Annex a -> Annex a
+currentProcessTransfer t a = go =<< Annex.getState Annex.concurrency
+ where
+ go NonConcurrent = a
+ go (Concurrent _) = do
+ tv <- Annex.getState Annex.currentprocesstransfers
+ bracket_ (setup tv) (cleanup tv) a
+ setup tv = liftIO $ atomically $ do
+ s <- readTVar tv
+ if S.member t s
+ then retry
+ else writeTVar tv $! S.insert t s
+ cleanup tv = liftIO $ atomically $
+ modifyTVar' tv $ S.delete t