summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/ScanRemotes.hs27
-rw-r--r--Assistant/Sync.hs19
-rw-r--r--Assistant/Threads/TransferScanner.hs44
-rw-r--r--Assistant/TransferQueue.hs4
-rw-r--r--Command/Move.hs9
-rw-r--r--Command/RecvKey.hs5
-rw-r--r--Command/SendKey.hs13
-rw-r--r--Fields.hs6
-rw-r--r--Logs/Transfer.hs69
9 files changed, 132 insertions, 64 deletions
diff --git a/Assistant/ScanRemotes.hs b/Assistant/ScanRemotes.hs
index 524dc200b..2920e89c3 100644
--- a/Assistant/ScanRemotes.hs
+++ b/Assistant/ScanRemotes.hs
@@ -14,9 +14,12 @@ import Data.Function
import Control.Concurrent.STM
import qualified Data.Map as M
-type Priority = Int
+data ScanInfo = ScanInfo
+ { scanPriority :: Int
+ , fullScan :: Bool
+ }
-type ScanRemoteMap = TMVar (M.Map Remote Priority)
+type ScanRemoteMap = TMVar (M.Map Remote ScanInfo)
{- The TMVar starts empty, and is left empty when there are no remotes
- to scan. -}
@@ -25,21 +28,23 @@ newScanRemoteMap = atomically newEmptyTMVar
{- Blocks until there is a remote that needs to be scanned.
- Processes higher priority remotes first. -}
-getScanRemote :: ScanRemoteMap -> IO Remote
+getScanRemote :: ScanRemoteMap -> IO (Remote, ScanInfo)
getScanRemote v = atomically $ do
m <- takeTMVar v
- let l = reverse $ map fst $ sortBy (compare `on` snd) $ M.toList m
+ let l = reverse $ sortBy (compare `on` scanPriority . snd) $ M.toList m
case l of
[] -> retry -- should never happen
- (newest:_) -> do
- let m' = M.delete newest m
+ (ret@(r, _):_) -> do
+ let m' = M.delete r m
unless (M.null m') $
putTMVar v m'
- return newest
+ return ret
{- Adds new remotes that need scanning to the map. -}
-addScanRemotes :: ScanRemoteMap -> [Remote] -> IO ()
-addScanRemotes _ [] = noop
-addScanRemotes v rs = atomically $ do
+addScanRemotes :: ScanRemoteMap -> [Remote] -> Bool -> IO ()
+addScanRemotes _ [] _ = noop
+addScanRemotes v rs full = atomically $ do
m <- fromMaybe M.empty <$> tryTakeTMVar v
- putTMVar v $ M.union m $ M.fromList $ map (\r -> (r, Remote.cost r)) rs
+ putTMVar v $ M.union (M.fromList $ zip rs (map info rs)) m
+ where
+ info r = ScanInfo (Remote.cost r) full
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index 42f82e9ab..6a586e097 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -26,10 +26,11 @@ import qualified Data.Map as M
{- Syncs with remotes that may have been disconnected for a while.
-
- - After getting git in sync, queues a scan for file transfers.
- - To avoid doing that expensive scan unnecessarily, it's only run
- - if the git-annex branches of the remotes have diverged from the
- - local git-annex branch.
+ - First gets git in sync, and then prepares any necessary file transfers.
+ -
+ - An expensive full scan is queued when the git-annex branches of the
+ - remotes have diverged from the local git-annex branch. Otherwise,
+ - it's sufficient to requeue failed transfers.
-}
reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO ()
reconnectRemotes _ _ _ _ [] = noop
@@ -38,16 +39,14 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $
sync =<< runThreadState st (inRepo Git.Branch.current)
where
sync (Just branch) = do
- haddiverged <- manualPull st (Just branch) rs
- when haddiverged $
- addScanRemotes scanremotes rs
+ diverged <- manualPull st (Just branch) rs
+ addScanRemotes scanremotes rs diverged
now <- getCurrentTime
pushToRemotes threadname now st Nothing rs
{- No local branch exists yet, but we can try pulling. -}
sync Nothing = do
- haddiverged <- manualPull st Nothing rs
- when haddiverged $
- addScanRemotes scanremotes rs
+ diverged <- manualPull st Nothing rs
+ addScanRemotes scanremotes rs diverged
return True
{- Updates the local sync branch, then pushes it to all remotes, in
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index 6bef2a6f1..b3222edb4 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -30,24 +30,45 @@ thisThread = "TransferScanner"
transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO ()
transferScannerThread st dstatus scanremotes transferqueue = do
runEvery (Seconds 2) $ do
- r <- getScanRemote scanremotes
- liftIO $ debug thisThread ["starting scan of", show r]
- void $ alertWhile dstatus (scanAlert r) $
- scan st dstatus transferqueue r
- liftIO $ debug thisThread ["finished scan of", show r]
+ (r, info) <- getScanRemote scanremotes
+ scanned <- runThreadState st $ inRepo $
+ checkTransferScanned $ Remote.uuid r
+ if not scanned || fullScan info
+ then do
+ liftIO $ debug thisThread ["starting scan of", show r]
+ void $ alertWhile dstatus (scanAlert r) $
+ expensiveScan st dstatus transferqueue r
+ liftIO $ debug thisThread ["finished scan of", show r]
+ runThreadState st $ inRepo $
+ transferScanned $ Remote.uuid r
+ else failedTransferScan st dstatus transferqueue r
-{- This is a naive scan through the git work tree.
+{- This is a cheap scan for failed transfers involving a remote. -}
+failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
+failedTransferScan st dstatus transferqueue r = do
+ ts <- runThreadState st $
+ getFailedTransfers $ Remote.uuid r
+ go ts
+ where
+ go [] = noop
+ go ((t, info):ts) = do
+ queueTransferWhenSmall
+ transferqueue dstatus (associatedFile info) t r
+ void $ runThreadState st $ inRepo $
+ liftIO . tryIO . removeFile . failedTransferFile t
+ go ts
+
+{- This is a expensive scan through the full git work tree.
-
- The scan is blocked when the transfer queue gets too large. -}
-scan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO Bool
-scan st dstatus transferqueue r = do
+expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO Bool
+expensiveScan st dstatus transferqueue r = do
g <- runThreadState st $ fromRepo id
files <- LsFiles.inRepo [] g
go files
- inRepo $ transferScanned $ uuid r
return True
where
- go [] = return ()
+ go [] = noop
go (f:fs) = do
v <- runThreadState st $ whenAnnexed check f
case v of
@@ -67,8 +88,7 @@ scan st dstatus transferqueue r = do
| otherwise = return Nothing
u = Remote.uuid r
- enqueue f t = queueTransferAt smallsize Later transferqueue dstatus (Just f) t r
- smallsize = 10
+ enqueue f t = queueTransferWhenSmall transferqueue dstatus (Just f) t r
{- Look directly in remote for the key when it's cheap;
- otherwise rely on the location log. -}
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index aa6192527..18719de8e 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -13,6 +13,7 @@ module Assistant.TransferQueue (
queueTransfers,
queueTransfer,
queueTransferAt,
+ queueTransferWhenSmall,
getNextTransfer,
dequeueTransfer,
) where
@@ -115,6 +116,9 @@ queueTransferAt wantsz schedule q dstatus f t remote = do
else retry -- blocks until queuesize changes
enqueue schedule q dstatus t (stubInfo f remote)
+queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransferWhenSmall = queueTransferAt 10 Later
+
{- Blocks until a pending transfer is available from the queue,
- and removes it.
-
diff --git a/Command/Move.hs b/Command/Move.hs
index e7c11e80d..7955cecd3 100644
--- a/Command/Move.hs
+++ b/Command/Move.hs
@@ -135,13 +135,12 @@ fromPerform :: Remote -> Bool -> Key -> FilePath -> CommandPerform
fromPerform src move key file = moveLock move key $
ifM (inAnnex key)
( handle move True
- , download (Remote.uuid src) key (Just file) $ do
- showAction $ "from " ++ Remote.name src
- ok <- getViaTmp key $
- Remote.retrieveKeyFile src key (Just file)
- handle move ok
+ , handle move =<< go
)
where
+ go = download (Remote.uuid src) key (Just file) $ do
+ showAction $ "from " ++ Remote.name src
+ getViaTmp key $ Remote.retrieveKeyFile src key (Just file)
handle _ False = stop -- failed
handle False True = next $ return True -- copy complete
handle True True = do -- finish moving
diff --git a/Command/RecvKey.hs b/Command/RecvKey.hs
index ce8bff997..0606f9c51 100644
--- a/Command/RecvKey.hs
+++ b/Command/RecvKey.hs
@@ -13,6 +13,7 @@ import CmdLine
import Annex.Content
import Utility.RsyncFile
import Logs.Transfer
+import Command.SendKey (fieldTransfer)
def :: [Command]
def = [oneShot $ command "recvkey" paramKey seek
@@ -30,7 +31,7 @@ start key = ifM (inAnnex key)
-- forcibly quit after receiving one key,
-- and shutdown cleanly
_ <- shutdown True
- liftIO exitSuccess
- , liftIO exitFailure
+ return True
+ , return False
)
)
diff --git a/Command/SendKey.hs b/Command/SendKey.hs
index 5eca70d24..8f914b5ed 100644
--- a/Command/SendKey.hs
+++ b/Command/SendKey.hs
@@ -12,6 +12,7 @@ import Command
import Annex.Content
import Utility.RsyncFile
import Logs.Transfer
+import qualified Fields
def :: [Command]
def = [oneShot $ command "sendkey" paramKey seek
@@ -24,9 +25,17 @@ start :: Key -> CommandStart
start key = ifM (inAnnex key)
( fieldTransfer Upload key $ do
file <- inRepo $ gitAnnexLocation key
- liftIO $ ifM (rsyncServerSend file)
- ( exitSuccess , exitFailure )
+ liftIO $ rsyncServerSend file
, do
warning "requested key is not present"
liftIO exitFailure
)
+
+fieldTransfer :: Direction -> Key -> Annex Bool -> CommandStart
+fieldTransfer direction key a = do
+ afile <- Fields.getField Fields.associatedFile
+ ok <- maybe a (\u -> runTransfer (Transfer direction (toUUID u) key) afile a)
+ =<< Fields.getField Fields.remoteUUID
+ if ok
+ then liftIO exitSuccess
+ else liftIO exitFailure
diff --git a/Fields.hs b/Fields.hs
index 38427ad05..145a8adca 100644
--- a/Fields.hs
+++ b/Fields.hs
@@ -18,6 +18,9 @@ data Field = Field
, fieldCheck :: String -> Bool
}
+getField :: Field -> Annex (Maybe String)
+getField = Annex.getField . fieldName
+
remoteUUID :: Field
remoteUUID = Field "remoteuuid" $
-- does it look like a UUID?
@@ -27,6 +30,3 @@ associatedFile :: Field
associatedFile = Field "associatedfile" $ \f ->
-- is the file a safe relative filename?
not (isAbsolute f) && not ("../" `isPrefixOf` f)
-
-getField :: Field -> Annex (Maybe String)
-getField = Annex.getField . fieldName
diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs
index 4e43929fc..b412ccd3e 100644
--- a/Logs/Transfer.hs
+++ b/Logs/Transfer.hs
@@ -13,7 +13,6 @@ import Annex.Exception
import qualified Git
import Types.Remote
import Types.Key
-import qualified Fields
import Utility.Percentage
import System.Posix.Types
@@ -66,23 +65,20 @@ percentComplete (Transfer { transferKey = key }) (TransferInfo { bytesComplete =
(\size -> percentage size complete) <$> keySize key
percentComplete _ _ = Nothing
-upload :: UUID -> Key -> AssociatedFile -> Annex a -> Annex a
+upload :: UUID -> Key -> AssociatedFile -> Annex Bool -> Annex Bool
upload u key file a = runTransfer (Transfer Upload u key) file a
-download :: UUID -> Key -> AssociatedFile -> Annex a -> Annex a
+download :: UUID -> Key -> AssociatedFile -> Annex Bool -> Annex Bool
download u key file a = runTransfer (Transfer Download u key) file a
-fieldTransfer :: Direction -> Key -> Annex a -> Annex a
-fieldTransfer direction key a = do
- afile <- Fields.getField Fields.associatedFile
- maybe a (\u -> runTransfer (Transfer direction (toUUID u) key) afile a)
- =<< Fields.getField Fields.remoteUUID
-
{- Runs a transfer action. Creates and locks the lock file while the
- action is running, and stores info in the transfer information
- file. Will throw an error if the transfer is already in progress.
+ -
+ - If the transfer action returns False, the transfer info is
+ - left in the failedTransferDir.
-}
-runTransfer :: Transfer -> Maybe FilePath -> Annex a -> Annex a
+runTransfer :: Transfer -> Maybe FilePath -> Annex Bool -> Annex Bool
runTransfer t file a = do
tfile <- fromRepo $ transferFile t
createAnnexDirectory $ takeDirectory tfile
@@ -95,21 +91,28 @@ runTransfer t file a = do
<*> pure Nothing
<*> pure file
<*> pure False
- bracketIO (prep tfile mode info) (cleanup tfile) a
+ let content = writeTransferInfo info
+ ok <- bracketIO (prep tfile mode content) (cleanup tfile) a
+ unless ok $ failed content
+ return ok
where
- prep tfile mode info = do
+ prep tfile mode content = do
fd <- openFd (transferLockFile tfile) ReadWrite (Just mode)
defaultFileFlags { trunc = True }
locked <- catchMaybeIO $
setLock fd (WriteLock, AbsoluteSeek, 0, 0)
when (locked == Nothing) $
error $ "transfer already in progress"
- writeFile tfile $ writeTransferInfo info
+ writeFile tfile content
return fd
cleanup tfile fd = do
void $ tryIO $ removeFile tfile
void $ tryIO $ removeFile $ transferLockFile tfile
closeFd fd
+ failed content = do
+ failedtfile <- fromRepo $ failedTransferFile t
+ createAnnexDirectory $ takeDirectory failedtfile
+ liftIO $ writeFile failedtfile content
{- If a transfer is still running, returns its TransferInfo. -}
checkTransfer :: Transfer -> Annex (Maybe TransferInfo)
@@ -128,7 +131,7 @@ checkTransfer t = do
Nothing -> return Nothing
Just (pid, _) -> liftIO $
flip catchDefaultIO Nothing $ do
- readTransferInfo pid
+ readTransferInfo (Just pid)
<$> readFile tfile
{- Gets all currently running transfers. -}
@@ -140,15 +143,35 @@ getTransfers = do
filter running $ zip transfers infos
where
findfiles = liftIO . mapM dirContentsRecursive
- =<< mapM (fromRepo . transferDir) [Upload, Download]
+ =<< mapM (fromRepo . transferDir)
+ [Download, Upload]
running (_, i) = isJust i
+{- Gets failed transfers for a given remote UUID. -}
+getFailedTransfers :: UUID -> Annex [(Transfer, TransferInfo)]
+getFailedTransfers u = catMaybes <$> (liftIO . getpairs =<< concat <$> findfiles)
+ where
+ getpairs = mapM $ \f -> do
+ let mt = parseTransferFile f
+ mi <- readTransferInfo Nothing <$> readFile f
+ return $ case (mt, mi) of
+ (Just t, Just i) -> Just (t, i)
+ _ -> Nothing
+ findfiles = liftIO . mapM dirContentsRecursive
+ =<< mapM (fromRepo . failedTransferDir u)
+ [Download, Upload]
+
{- The transfer information file to use for a given Transfer. -}
transferFile :: Transfer -> Git.Repo -> FilePath
transferFile (Transfer direction u key) r = transferDir direction r
</> fromUUID u
</> keyFile key
+{- The transfer information file to use to record a failed Transfer -}
+failedTransferFile :: Transfer -> Git.Repo -> FilePath
+failedTransferFile (Transfer direction u key) r = failedTransferDir u direction r
+ </> keyFile key
+
{- The transfer lock file corresponding to a given transfer info file. -}
transferLockFile :: FilePath -> FilePath
transferLockFile infofile = let (d,f) = splitFileName infofile in
@@ -176,12 +199,12 @@ writeTransferInfo info = unlines
, fromMaybe "" $ associatedFile info -- comes last; arbitrary content
]
-readTransferInfo :: ProcessID -> String -> Maybe TransferInfo
-readTransferInfo pid s =
+readTransferInfo :: (Maybe ProcessID) -> String -> Maybe TransferInfo
+readTransferInfo mpid s =
case bits of
[time] -> TransferInfo
<$> (Just <$> parsePOSIXTime time)
- <*> pure (Just pid)
+ <*> pure mpid
<*> pure Nothing
<*> pure Nothing
<*> pure Nothing
@@ -200,13 +223,21 @@ parsePOSIXTime s = utcTimeToPOSIXSeconds
transferDir :: Direction -> Git.Repo -> FilePath
transferDir direction r = gitAnnexTransferDir r </> showLcDirection direction
+{- The directory holding failed transfer information files for a given
+ - Direction and UUID -}
+failedTransferDir :: UUID -> Direction -> Git.Repo -> FilePath
+failedTransferDir u direction r = gitAnnexTransferDir r
+ </> "failed"
+ </> showLcDirection direction
+ </> fromUUID u
+
{- The directory holding remote uuids that have been scanned for transfers. -}
transferScannedDir :: Git.Repo -> FilePath
transferScannedDir r = gitAnnexTransferDir r </> "scanned"
{- The file indicating whether a remote uuid has been scanned. -}
transferScannedFile :: UUID -> Git.Repo -> FilePath
-transferScannedFile u r = transferScannedDir r </> show u
+transferScannedFile u r = transferScannedDir r </> fromUUID u
{- Checks if a given remote UUID has been scanned for transfers. -}
checkTransferScanned :: UUID -> Git.Repo -> IO Bool