diff options
43 files changed, 335 insertions, 102 deletions
@@ -69,6 +69,7 @@ import Utility.Url import "mtl" Control.Monad.Reader import Control.Concurrent +import Control.Concurrent.Async import qualified Data.Map as M import qualified Data.Set as S @@ -134,6 +135,7 @@ data AnnexState = AnnexState #endif , existinghooks :: M.Map Git.Hook.Hook Bool , desktopnotify :: DesktopNotify + , workers :: [Either AnnexState (Async AnnexState)] } newState :: GitConfig -> Git.Repo -> AnnexState @@ -179,6 +181,7 @@ newState c r = AnnexState #endif , existinghooks = M.empty , desktopnotify = mempty + , workers = [] } {- Makes an Annex state object for the specified git repo. diff --git a/Annex/BranchState.hs b/Annex/BranchState.hs index 889a936b9..2885582cd 100644 --- a/Annex/BranchState.hs +++ b/Annex/BranchState.hs @@ -16,11 +16,9 @@ import qualified Annex getState :: Annex BranchState getState = Annex.getState Annex.branchstate -setState :: BranchState -> Annex () -setState state = Annex.changeState $ \s -> s { Annex.branchstate = state } - changeState :: (BranchState -> BranchState) -> Annex () -changeState changer = setState =<< changer <$> getState +changeState changer = Annex.changeState $ \s -> + s { Annex.branchstate = changer (Annex.branchstate s) } {- Runs an action to check that the index file exists, if it's not been - checked before in this run of git-annex. -} diff --git a/Annex/CheckAttr.hs b/Annex/CheckAttr.hs index 46c71fe72..bbe979b3e 100644 --- a/Annex/CheckAttr.hs +++ b/Annex/CheckAttr.hs @@ -7,7 +7,8 @@ module Annex.CheckAttr ( checkAttr, - checkAttrHandle + checkAttrHandle, + checkAttrStop, ) where import Common.Annex @@ -33,3 +34,10 @@ checkAttrHandle = maybe startup return =<< Annex.getState Annex.checkattrhandle h <- inRepo $ Git.checkAttrStart annexAttrs Annex.changeState $ \s -> s { Annex.checkattrhandle = Just h } return h + +checkAttrStop :: Annex () +checkAttrStop = maybe noop stop =<< Annex.getState Annex.checkattrhandle + where + stop h = do + liftIO $ Git.checkAttrStop h + Annex.changeState $ \s -> s { Annex.checkattrhandle = Nothing } diff --git a/Annex/CheckIgnore.hs b/Annex/CheckIgnore.hs index 8d7df1e2c..86b46f7c2 100644 --- a/Annex/CheckIgnore.hs +++ b/Annex/CheckIgnore.hs @@ -8,7 +8,8 @@ module Annex.CheckIgnore ( checkIgnored, - checkIgnoreHandle + checkIgnoreHandle, + checkIgnoreStop ) where import Common.Annex @@ -30,3 +31,11 @@ checkIgnoreHandle = maybe startup return =<< Annex.getState Annex.checkignorehan warning "The installed version of git is too old for .gitignores to be honored by git-annex." Annex.changeState $ \s -> s { Annex.checkignorehandle = Just v } return v + +checkIgnoreStop :: Annex () +checkIgnoreStop = maybe noop stop =<< Annex.getState Annex.checkignorehandle + where + stop (Just h) = do + liftIO $ Git.checkIgnoreStop h + Annex.changeState $ \s -> s { Annex.checkignorehandle = Nothing } + stop Nothing = noop diff --git a/Annex/Concurrent.hs b/Annex/Concurrent.hs new file mode 100644 index 000000000..d3585e04f --- /dev/null +++ b/Annex/Concurrent.hs @@ -0,0 +1,65 @@ +{- git-annex concurrent state + - + - Copyright 2015 Joey Hess <id@joeyh.name> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Annex.Concurrent where + +import Common.Annex +import Annex +import Annex.CatFile +import Annex.CheckAttr +import Annex.CheckIgnore + +import qualified Data.Map as M + +{- Allows forking off a thread that uses a copy of the current AnnexState + - to run an Annex action. + - + - The returned IO action can be used to start the thread. + - It returns an Annex action that must be run in the original + - calling context to merge the forked AnnexState back into the + - current AnnexState. + -} +forkState :: Annex a -> Annex (IO (Annex a)) +forkState a = do + st <- dupState + return $ do + (ret, newst) <- run st a + return $ do + mergeState newst + return ret + +{- Returns a copy of the current AnnexState that is safe to be + - used when forking off a thread. + - + - After an Annex action is run using this AnnexState, it + - should be merged back into the current Annex's state, + - by calling mergeState. + -} +dupState :: Annex AnnexState +dupState = do + st <- Annex.getState id + -- avoid sharing eg, open file handles + return $ st + { Annex.workers = [] + , Annex.catfilehandles = M.empty + , Annex.checkattrhandle = Nothing + , Annex.checkignorehandle = Nothing + } + +{- Merges the passed AnnexState into the current Annex state. + - Also shuts closes various handles in it. -} +mergeState :: AnnexState -> Annex () +mergeState st = do + st' <- liftIO $ snd <$> run st closehandles + forM_ (M.toList $ Annex.cleanup st') $ + uncurry addCleanup + changeState $ \s -> s { errcounter = errcounter s + errcounter st' } + where + closehandles = do + catFileStop + checkAttrStop + checkIgnoreStop diff --git a/Annex/Drop.hs b/Annex/Drop.hs index a99a1edff..973e51348 100644 --- a/Annex/Drop.hs +++ b/Annex/Drop.hs @@ -42,7 +42,7 @@ type Reason = String - The runner is used to run commands, and so can be either callCommand - or commandAction. -} -handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> CommandActionRunner -> Annex () +handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> (CommandStart -> CommandCleanup) -> Annex () handleDropsFrom locs rs reason fromhere key afile knownpresentremote runner = do fs <- ifM isDirect ( do diff --git a/BuildFlags.hs b/BuildFlags.hs index 7ae526f63..a0f0ac298 100644 --- a/BuildFlags.hs +++ b/BuildFlags.hs @@ -84,7 +84,7 @@ buildFlags = filter (not . null) #ifdef WITH_TORRENTPARSER , "TorrentParser" #else -#warning Building without haskell torrent library; will instead use btshowmetainfo to parse torrent files. + #endif #ifdef WITH_EKG , "EKG" diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index b566621bb..2838e4ff8 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -1,6 +1,6 @@ {- git-annex command-line actions - - - Copyright 2010-2014 Joey Hess <id@joeyh.name> + - Copyright 2010-2015 Joey Hess <id@joeyh.name> - - Licensed under the GNU GPL version 3 or higher. -} @@ -9,10 +9,15 @@ module CmdLine.Action where import Common.Annex import qualified Annex +import Annex.Concurrent import Types.Command import qualified Annex.Queue +import Messages.Internal +import Types.Messages -type CommandActionRunner = CommandStart -> CommandCleanup +import Control.Concurrent.Async +import Control.Exception (throwIO) +import Data.Either {- Runs a command, starting with the check stage, and then - the seek stage. Finishes by running the continutation, and @@ -22,6 +27,7 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa mapM_ runCheck c Annex.changeState $ \s -> s { Annex.errcounter = 0 } seek params + finishCommandActions cont showerrcount =<< Annex.getState Annex.errcounter where @@ -33,9 +39,77 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa - including by throwing IO errors (but other errors terminate the whole - command). - - - This should only be run in the seek stage. -} -commandAction :: CommandActionRunner -commandAction a = account =<< tryIO go + - When concurrency is enabled, a thread is forked off to run the action + - in the background, as soon as a free slot is available. + + - This should only be run in the seek stage. + -} +commandAction :: CommandStart -> Annex () +commandAction a = withOutputType go + where + go (ParallelOutput n) = do + ws <- Annex.getState Annex.workers + (st, ws') <- if null ws + then do + st <- dupState + return (st, replicate (n-1) (Left st)) + else do + l <- liftIO $ drainTo (n-1) ws + findFreeSlot l + w <- liftIO $ async $ snd <$> Annex.run st run + Annex.changeState $ \s -> s { Annex.workers = Right w:ws' } + go _ = run + run = void $ includeCommandAction a + +{- Waits for any forked off command actions to finish. + - + - Merge together the cleanup actions of all the AnnexStates used by + - threads, into the current Annex's state, so they'll run at shutdown. + - + - Also merge together the errcounters of the AnnexStates. + -} +finishCommandActions :: Annex () +finishCommandActions = do + l <- liftIO . drainTo 0 =<< Annex.getState Annex.workers + forM_ (lefts l) mergeState + +{- Wait for Asyncs from the list to finish, replacing them with their + - final AnnexStates, until the list of remaining Asyncs is not larger + - than the specified size, then returns the new list. + - + - If the action throws an exception, it is propigated, but first + - all other actions are waited for, to allow for a clean shutdown. + -} +drainTo + :: Int + -> [Either Annex.AnnexState (Async Annex.AnnexState)] + -> IO [Either Annex.AnnexState (Async Annex.AnnexState)] +drainTo sz l + | null as || sz >= length as = return l + | otherwise = do + (done, ret) <- waitAnyCatch as + let as' = filter (/= done) as + case ret of + Left e -> do + void $ drainTo 0 (map Left sts ++ map Right as') + throwIO e + Right st -> do + drainTo sz $ map Left (st:sts) ++ map Right as' + where + (sts, as) = partitionEithers l + +findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Annex.AnnexState, [Either Annex.AnnexState (Async Annex.AnnexState)]) +findFreeSlot = go [] + where + go c [] = do + st <- dupState + return (st, c) + go c (Left st:rest) = return (st, c ++ rest) + go c (v:rest) = go (v:c) rest + +{- Like commandAction, but without the concurrency. -} +includeCommandAction :: CommandStart -> CommandCleanup +includeCommandAction a = account =<< tryIO go where go = do Annex.Queue.flushWhenFull @@ -53,7 +127,7 @@ commandAction a = account =<< tryIO go {- Runs a single command action through the start, perform and cleanup - stages, without catching errors. Useful if one command wants to run - part of another command. -} -callCommandAction :: CommandActionRunner +callCommandAction :: CommandStart -> CommandCleanup callCommandAction = start where start = stage $ maybe skip perform diff --git a/CmdLine/GitAnnex/Options.hs b/CmdLine/GitAnnex/Options.hs index be1c74ede..38fa93090 100644 --- a/CmdLine/GitAnnex/Options.hs +++ b/CmdLine/GitAnnex/Options.hs @@ -138,6 +138,14 @@ jsonOption :: Option jsonOption = Option ['j'] ["json"] (NoArg (Annex.setOutput JSONOutput)) "enable JSON output" +jobsOption :: Option +jobsOption = Option ['J'] ["jobs"] (ReqArg set paramNumber) + "enable concurrent jobs" + where + set s = case readish s of + Nothing -> error "Bad --jobs number" + Just n -> Annex.setOutput (ParallelOutput n) + timeLimitOption :: Option timeLimitOption = Option ['T'] ["time-limit"] (ReqArg Limit.addTimeLimit paramTime) diff --git a/CmdLine/Seek.hs b/CmdLine/Seek.hs index 96076261f..3166ab83d 100644 --- a/CmdLine/Seek.hs +++ b/CmdLine/Seek.hs @@ -84,7 +84,7 @@ withFilesInRefs a = mapM_ go case v of Nothing -> noop Just k -> whenM (matcher $ MatchingKey k) $ - void $ commandAction $ a f k + commandAction $ a f k withPathContents :: ((FilePath, FilePath) -> CommandStart) -> CommandSeek withPathContents a params = do diff --git a/Command/Fsck.hs b/Command/Fsck.hs index 479f4521c..8414b5b26 100644 --- a/Command/Fsck.hs +++ b/Command/Fsck.hs @@ -136,7 +136,7 @@ performRemote key file backend numcopies remote = cleanup cleanup `after` a tmp getfile tmp = ifM (checkDiskSpace (Just tmp) key 0) - ( ifM (Remote.retrieveKeyFileCheap remote key tmp) + ( ifM (Remote.retrieveKeyFileCheap remote key (Just file) tmp) ( return (Just True) , ifM (Annex.getState Annex.fast) ( return Nothing diff --git a/Command/Get.hs b/Command/Get.hs index 7e95493eb..380a68097 100644 --- a/Command/Get.hs +++ b/Command/Get.hs @@ -21,7 +21,7 @@ cmd = [withOptions getOptions $ command "get" paramPaths seek SectionCommon "make content of annexed files available"] getOptions :: [Option] -getOptions = fromOption : annexedMatchingOptions ++ keyOptions ++ [autoOption] +getOptions = fromOption : autoOption : jobsOption : annexedMatchingOptions ++ keyOptions seek :: CommandSeek seek ps = do diff --git a/Command/Mirror.hs b/Command/Mirror.hs index 14f70d3b6..535dc64b6 100644 --- a/Command/Mirror.hs +++ b/Command/Mirror.hs @@ -21,7 +21,7 @@ cmd = [withOptions mirrorOptions $ command "mirror" paramPaths seek SectionCommon "mirror content of files to/from another repository"] mirrorOptions :: [Option] -mirrorOptions = fromToOptions ++ annexedMatchingOptions ++ keyOptions +mirrorOptions = fromToOptions ++ [jobsOption] ++ annexedMatchingOptions ++ keyOptions seek :: CommandSeek seek ps = do diff --git a/Command/Move.hs b/Command/Move.hs index bdc8018ba..91f7c8ea7 100644 --- a/Command/Move.hs +++ b/Command/Move.hs @@ -22,7 +22,7 @@ cmd = [withOptions moveOptions $ command "move" paramPaths seek SectionCommon "move content of files to/from another repository"] moveOptions :: [Option] -moveOptions = fromToOptions ++ keyOptions ++ annexedMatchingOptions +moveOptions = fromToOptions ++ [jobsOption] ++ keyOptions ++ annexedMatchingOptions seek :: CommandSeek seek ps = do diff --git a/Command/Sync.hs b/Command/Sync.hs index 130693909..7c1ed0067 100644 --- a/Command/Sync.hs +++ b/Command/Sync.hs @@ -389,7 +389,7 @@ syncFile rs f k = do u <- getUUID let locs' = concat [[u | got], putrs, locs] - -- Using callCommandAction rather than commandAction for drops, + -- Using callCommandAction rather than includeCommandAction for drops, -- because a failure to drop does not mean the sync failed. handleDropsFrom locs' rs "unwanted" True k (Just f) Nothing callCommandAction @@ -403,7 +403,7 @@ syncFile rs f k = do ( return [ get have ] , return [] ) - get have = commandAction $ do + get have = includeCommandAction $ do showStart "get" f next $ next $ getViaTmp k $ \dest -> getKeyFile' k (Just f) dest have @@ -415,7 +415,7 @@ syncFile rs f k = do , return [] ) put dest = do - ok <- commandAction $ do + ok <- includeCommandAction $ do showStart "copy" f Command.Move.toStart' dest False (Just f) k return (ok, if ok then Just (Remote.uuid dest) else Nothing) diff --git a/Command/TestRemote.hs b/Command/TestRemote.hs index 4a65aa4ec..b0f2c28bb 100644 --- a/Command/TestRemote.hs +++ b/Command/TestRemote.hs @@ -171,7 +171,7 @@ testUnavailable st r k = Remote.retrieveKeyFile r k Nothing dest nullMeterUpdate , check (== Right False) "retrieveKeyFileCheap" $ getViaTmp k $ \dest -> - Remote.retrieveKeyFileCheap r k dest + Remote.retrieveKeyFileCheap r k Nothing dest ] where check checkval desc a = testCase desc $ do diff --git a/Messages.hs b/Messages.hs index 5dffbd8de..f6d26db9d 100644 --- a/Messages.hs +++ b/Messages.hs @@ -19,6 +19,7 @@ module Messages ( showEndOk, showEndFail, showEndResult, + endResult, toplevelWarning, warning, warningIO, @@ -111,11 +112,11 @@ showEndFail :: Annex () showEndFail = showEndResult False showEndResult :: Bool -> Annex () -showEndResult ok = handleMessage (JSON.end ok) $ putStrLn msg - where - msg - | ok = "ok" - | otherwise = "failed" +showEndResult ok = handleMessage (JSON.end ok) $ putStrLn $ endResult ok + +endResult :: Bool -> String +endResult True = "ok" +endResult False = "failed" toplevelWarning :: Bool -> String -> Annex () toplevelWarning makeway s = warning' makeway ("git-annex: " ++ s) @@ -191,6 +192,6 @@ disableDebugOutput = updateGlobalLogger rootLoggerName $ setLevel NOTICE commandProgressDisabled :: Annex Bool commandProgressDisabled = withOutputType $ \t -> return $ case t of QuietOutput -> True - ProgressOutput -> True + ParallelOutput _ -> True JSONOutput -> True NormalOutput -> False diff --git a/Messages/Internal.hs b/Messages/Internal.hs index 1dd856b5e..2495f4fd3 100644 --- a/Messages/Internal.hs +++ b/Messages/Internal.hs @@ -17,7 +17,7 @@ handleMessage json normal = withOutputType go where go NormalOutput = liftIO normal go QuietOutput = q - go ProgressOutput = q + go (ParallelOutput _) = q go JSONOutput = liftIO $ flushed json q :: Monad m => m () diff --git a/Messages/Progress.hs b/Messages/Progress.hs index e3df73ea4..9bc335570 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -15,34 +15,68 @@ import Types import Types.Messages import Types.Key -import Data.Progress.Meter -import Data.Progress.Tracker -import Data.Quantity +import System.Console.AsciiProgress +import qualified System.Console.Terminal.Size as Terminal +import Control.Concurrent {- Shows a progress meter while performing a transfer of a key. - The action is passed a callback to use to update the meter. -} -metered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a -metered combinemeterupdate key a = go (keySize key) +metered :: Maybe MeterUpdate -> Key -> AssociatedFile -> (MeterUpdate -> Annex a) -> Annex a +metered combinemeterupdate key af a = case keySize key of + Nothing -> nometer + Just size -> withOutputType (go $ fromInteger size) where - go (Just size) = meteredBytes combinemeterupdate size a - go _ = a (const noop) - -{- Shows a progress meter while performing an action on a given number - - of bytes. -} -meteredBytes :: Maybe MeterUpdate -> Integer -> (MeterUpdate -> Annex a) -> Annex a -meteredBytes combinemeterupdate size a = withOutputType go - where - go NormalOutput = do - progress <- liftIO $ newProgress "" size - meter <- liftIO $ newMeter progress "B" 25 (renderNums binaryOpts 1) + go _ QuietOutput = nometer + go _ JSONOutput = nometer + go size _ = do showOutput - r <- a $ \n -> liftIO $ do - setP progress $ fromBytesProcessed n - displayMeter stdout meter - maybe noop (\m -> m n) combinemeterupdate - liftIO $ clearMeter stdout meter + liftIO $ putStrLn "" + + cols <- liftIO $ maybe 79 Terminal.width <$> Terminal.size + let desc = truncatepretty cols $ fromMaybe (key2file key) af + + result <- liftIO newEmptyMVar + pg <- liftIO $ newProgressBar def + { pgWidth = cols + , pgFormat = desc ++ " :percent :bar ETA :eta" + , pgTotal = size + , pgOnCompletion = do + ok <- takeMVar result + putStrLn $ desc ++ " " ++ endResult ok + } + r <- a $ liftIO . pupdate pg + + liftIO $ do + -- See if the progress bar is complete or not. + sofar <- stCompleted <$> getProgressStats pg + putMVar result (sofar >= size) + -- May not be actually complete if the action failed, + -- but this just clears the progress bar. + complete pg + return r - go _ = a (const noop) + + pupdate pg n = do + let i = fromBytesProcessed n + sofar <- stCompleted <$> getProgressStats pg + when (i > sofar) $ + tickN pg (i - sofar) + threadDelay 100 + maybe noop (\m -> m n) combinemeterupdate + + nometer = a (const noop) + + truncatepretty n s + | length s > n = take (n-2) s ++ ".." + | otherwise = s + +{- Use when the progress meter is only desired for parallel + - mode; as when a command's own progress output is preferred. -} +parallelMetered :: Maybe MeterUpdate -> Key -> AssociatedFile -> (MeterUpdate -> Annex a) -> Annex a +parallelMetered combinemeterupdate key af a = withOutputType go + where + go (ParallelOutput _) = metered combinemeterupdate key af a + go _ = a (fromMaybe (const noop) combinemeterupdate) {- Progress dots. -} showProgressDots :: Annex () @@ -84,5 +118,5 @@ mkStderrRelayer = do mkStderrEmitter :: Annex (String -> IO ()) mkStderrEmitter = withOutputType go where - go ProgressOutput = return $ \s -> hPutStrLn stderr ("E: " ++ s) + go (ParallelOutput _) = return $ \s -> hPutStrLn stderr ("E: " ++ s) go _ = return (hPutStrLn stderr) diff --git a/Remote/BitTorrent.hs b/Remote/BitTorrent.hs index baba2e23e..05326e390 100644 --- a/Remote/BitTorrent.hs +++ b/Remote/BitTorrent.hs @@ -93,8 +93,8 @@ downloadKey key _file dest p = , return False ) -downloadKeyCheap :: Key -> FilePath -> Annex Bool -downloadKeyCheap _ _ = return False +downloadKeyCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool +downloadKeyCheap _ _ _ = return False uploadKey :: Key -> AssociatedFile -> MeterUpdate -> Annex Bool uploadKey _ _ _ = do diff --git a/Remote/Bup.hs b/Remote/Bup.hs index 42f17e921..b3152afcf 100644 --- a/Remote/Bup.hs +++ b/Remote/Bup.hs @@ -148,8 +148,8 @@ retrieve buprepo = byteRetriever $ \k sink -> do liftIO (hClose h >> forceSuccessProcess p pid) `after` (sink =<< liftIO (L.hGetContents h)) -retrieveCheap :: BupRepo -> Key -> FilePath -> Annex Bool -retrieveCheap _ _ _ = return False +retrieveCheap :: BupRepo -> Key -> AssociatedFile -> FilePath -> Annex Bool +retrieveCheap _ _ _ _ = return False {- Cannot revert having stored a key in bup, but at least the data for the - key will be used for deltaing data of other keys stored later. diff --git a/Remote/Ddar.hs b/Remote/Ddar.hs index 7495fcd42..a24960935 100644 --- a/Remote/Ddar.hs +++ b/Remote/Ddar.hs @@ -142,8 +142,8 @@ retrieve ddarrepo = byteRetriever $ \k sink -> do liftIO (hClose h >> forceSuccessProcess p pid) `after` (sink =<< liftIO (L.hGetContents h)) -retrieveCheap :: Key -> FilePath -> Annex Bool -retrieveCheap _ _ = return False +retrieveCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool +retrieveCheap _ _ _ = return False remove :: DdarRepo -> Remover remove ddarrepo key = do diff --git a/Remote/Directory.hs b/Remote/Directory.hs index 916013172..8b727c77e 100644 --- a/Remote/Directory.hs +++ b/Remote/Directory.hs @@ -156,12 +156,12 @@ retrieve d (LegacyChunks _) = Legacy.retrieve locations d retrieve d _ = simplyPrepare $ byteRetriever $ \k sink -> sink =<< liftIO (L.readFile =<< getLocation d k) -retrieveCheap :: FilePath -> ChunkConfig -> Key -> FilePath -> Annex Bool +retrieveCheap :: FilePath -> ChunkConfig -> Key -> AssociatedFile -> FilePath -> Annex Bool -- no cheap retrieval possible for chunks -retrieveCheap _ (UnpaddedChunks _) _ _ = return False -retrieveCheap _ (LegacyChunks _) _ _ = return False +retrieveCheap _ (UnpaddedChunks _) _ _ _ = return False +retrieveCheap _ (LegacyChunks _) _ _ _ = return False #ifndef mingw32_HOST_OS -retrieveCheap d NoChunks k f = liftIO $ catchBoolIO $ do +retrieveCheap d NoChunks k _af f = liftIO $ catchBoolIO $ do file <- absPath =<< getLocation d k ifM (doesFileExist file) ( do @@ -170,7 +170,7 @@ retrieveCheap d NoChunks k f = liftIO $ catchBoolIO $ do , return False ) #else -retrieveCheap _ _ _ _ = return False +retrieveCheap _ _ _ _ _ = return False #endif remove :: FilePath -> Remover diff --git a/Remote/External.hs b/Remote/External.hs index adfd79113..d09e1f9b3 100644 --- a/Remote/External.hs +++ b/Remote/External.hs @@ -56,7 +56,7 @@ gen r u c gc = do , name = Git.repoDescribe r , storeKey = storeKeyDummy , retrieveKeyFile = retreiveKeyFileDummy - , retrieveKeyFileCheap = \_ _ -> return False + , retrieveKeyFileCheap = \_ _ _ -> return False , removeKey = removeKeyDummy , checkPresent = checkPresentDummy , checkPresentCheap = False diff --git a/Remote/GCrypt.hs b/Remote/GCrypt.hs index 01efc6060..fc0c27f37 100644 --- a/Remote/GCrypt.hs +++ b/Remote/GCrypt.hs @@ -108,7 +108,7 @@ gen' r u c gc = do , name = Git.repoDescribe r , storeKey = storeKeyDummy , retrieveKeyFile = retreiveKeyFileDummy - , retrieveKeyFileCheap = \_ _ -> return False + , retrieveKeyFileCheap = \_ _ _ -> return False , removeKey = removeKeyDummy , checkPresent = checkPresentDummy , checkPresentCheap = repoCheap r diff --git a/Remote/Git.hs b/Remote/Git.hs index 4804cb10e..170c6fbf6 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -52,6 +52,7 @@ import qualified Remote.GCrypt import Annex.Path import Creds import Annex.CatFile +import Messages.Progress import Control.Concurrent import Control.Concurrent.MSampleVar @@ -355,9 +356,11 @@ dropKey r key {- Tries to copy a key's content from a remote's annex to a file. -} copyFromRemote :: Remote -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex Bool -copyFromRemote r key file dest _p = copyFromRemote' r key file dest -copyFromRemote' :: Remote -> Key -> AssociatedFile -> FilePath -> Annex Bool -copyFromRemote' r key file dest +copyFromRemote r key file dest p = parallelMetered (Just p) key file $ + copyFromRemote' r key file dest + +copyFromRemote' :: Remote -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex Bool +copyFromRemote' r key file dest meterupdate | not $ Git.repoIsUrl (repo r) = guardUsable (repo r) (return False) $ do params <- Ssh.rsyncParams r Download u <- getUUID @@ -435,7 +438,9 @@ copyFromRemote' r key file dest send bytes forever $ send =<< readSV v - let feeder = writeSV v . fromBytesProcessed + let feeder = \n -> do + meterupdate n + writeSV v (fromBytesProcessed n) let cleanup = do void $ tryIO $ killThread tid tryNonAsync $ @@ -443,9 +448,9 @@ copyFromRemote' r key file dest =<< tryTakeMVar pidv bracketIO noop (const cleanup) (const $ a feeder) -copyFromRemoteCheap :: Remote -> Key -> FilePath -> Annex Bool +copyFromRemoteCheap :: Remote -> Key -> AssociatedFile -> FilePath -> Annex Bool #ifndef mingw32_HOST_OS -copyFromRemoteCheap r key file +copyFromRemoteCheap r key af file | not $ Git.repoIsUrl (repo r) = guardUsable (repo r) (return False) $ liftIO $ do loc <- gitAnnexLocation key (repo r) $ fromJust $ remoteGitConfig $ gitconfig r @@ -459,17 +464,21 @@ copyFromRemoteCheap r key file ) | Git.repoIsSsh (repo r) = ifM (Annex.Content.preseedTmp key file) - ( copyFromRemote' r key Nothing file + ( parallelMetered Nothing key af $ + copyFromRemote' r key af file , return False ) | otherwise = return False #else -copyFromRemoteCheap _ _ _ = return False +copyFromRemoteCheap _ _ _ _ = return False #endif {- Tries to copy a key's content to a remote's annex. -} copyToRemote :: Remote -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool -copyToRemote r key file p +copyToRemote r key file p = parallelMetered (Just p) key file $ copyToRemote' r key file + +copyToRemote' :: Remote -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool +copyToRemote' r key file p | not $ Git.repoIsUrl (repo r) = guardUsable (repo r) (return False) $ commitOnCleanup r $ copylocal =<< Annex.Content.prepSendAnnex key diff --git a/Remote/Glacier.hs b/Remote/Glacier.hs index 289008266..75b264bac 100644 --- a/Remote/Glacier.hs +++ b/Remote/Glacier.hs @@ -162,8 +162,8 @@ retrieve r k sink = go =<< glacierEnv c u showLongNote "Recommend you wait up to 4 hours, and then run this command again." return ok -retrieveCheap :: Remote -> Key -> FilePath -> Annex Bool -retrieveCheap _ _ _ = return False +retrieveCheap :: Remote -> Key -> AssociatedFile -> FilePath -> Annex Bool +retrieveCheap _ _ _ _ = return False remove :: Remote -> Remover remove r k = glacierAction r diff --git a/Remote/Helper/Hooks.hs b/Remote/Helper/Hooks.hs index 3765281be..53bb370a6 100644 --- a/Remote/Helper/Hooks.hs +++ b/Remote/Helper/Hooks.hs @@ -36,7 +36,7 @@ addHooks' r starthook stophook = r' r' = r { storeKey = \k f p -> wrapper $ storeKey r k f p , retrieveKeyFile = \k f d p -> wrapper $ retrieveKeyFile r k f d p - , retrieveKeyFileCheap = \k f -> wrapper $ retrieveKeyFileCheap r k f + , retrieveKeyFileCheap = \k af f -> wrapper $ retrieveKeyFileCheap r k af f , removeKey = wrapper . removeKey r , checkPresent = wrapper . checkPresent r } diff --git a/Remote/Helper/Special.hs b/Remote/Helper/Special.hs index 7dd861a4e..483ef576e 100644 --- a/Remote/Helper/Special.hs +++ b/Remote/Helper/Special.hs @@ -157,10 +157,10 @@ specialRemote' :: SpecialRemoteCfg -> RemoteModifier specialRemote' cfg c preparestorer prepareretriever prepareremover preparecheckpresent baser = encr where encr = baser - { storeKey = \k _f p -> cip >>= storeKeyGen k p - , retrieveKeyFile = \k _f d p -> cip >>= retrieveKeyFileGen k d p - , retrieveKeyFileCheap = \k d -> cip >>= maybe - (retrieveKeyFileCheap baser k d) + { storeKey = \k f p -> cip >>= storeKeyGen k f p + , retrieveKeyFile = \k f d p -> cip >>= retrieveKeyFileGen k f d p + , retrieveKeyFileCheap = \k f d -> cip >>= maybe + (retrieveKeyFileCheap baser k f d) -- retrieval of encrypted keys is never cheap (\_ -> return False) , removeKey = \k -> cip >>= removeKeyGen k @@ -182,10 +182,10 @@ specialRemote' cfg c preparestorer prepareretriever prepareremover preparecheckp safely a = catchNonAsync a (\e -> warning (show e) >> return False) -- chunk, then encrypt, then feed to the storer - storeKeyGen k p enc = safely $ preparestorer k $ safely . go + storeKeyGen k f p enc = safely $ preparestorer k $ safely . go where go (Just storer) = sendAnnex k rollback $ \src -> - displayprogress p k $ \p' -> + displayprogress p k f $ \p' -> storeChunks (uuid baser) chunkconfig k src p' (storechunk enc storer) (checkPresent baser) @@ -200,10 +200,10 @@ specialRemote' cfg c preparestorer prepareretriever prepareremover preparecheckp storer (enck k) (ByteContent encb) p -- call retriever to get chunks; decrypt them; stream to dest file - retrieveKeyFileGen k dest p enc = + retrieveKeyFileGen k f dest p enc = safely $ prepareretriever k $ safely . go where - go (Just retriever) = displayprogress p k $ \p' -> + go (Just retriever) = displayprogress p k f $ \p' -> retrieveChunks retriever (uuid baser) chunkconfig enck k dest p' (sink dest enc) go Nothing = return False @@ -223,8 +223,8 @@ specialRemote' cfg c preparestorer prepareretriever prepareremover preparecheckp chunkconfig = chunkConfig cfg - displayprogress p k a - | displayProgress cfg = metered (Just p) k a + displayprogress p k f a + | displayProgress cfg = metered (Just p) k f a | otherwise = a p {- Sink callback for retrieveChunks. Stores the file content into the diff --git a/Remote/Hook.hs b/Remote/Hook.hs index 6df326295..9abc4e303 100644 --- a/Remote/Hook.hs +++ b/Remote/Hook.hs @@ -130,8 +130,8 @@ retrieve h = fileRetriever $ \d k _p -> unlessM (runHook h "retrieve" k (Just d) $ return True) $ error "failed to retrieve content" -retrieveCheap :: HookName -> Key -> FilePath -> Annex Bool -retrieveCheap _ _ _ = return False +retrieveCheap :: HookName -> Key -> AssociatedFile -> FilePath -> Annex Bool +retrieveCheap _ _ _ _ = return False remove :: HookName -> Remover remove h k = runHook h "remove" k Nothing $ return True diff --git a/Remote/Rsync.hs b/Remote/Rsync.hs index a882e081d..2c8b17884 100644 --- a/Remote/Rsync.hs +++ b/Remote/Rsync.hs @@ -191,8 +191,8 @@ retrieve o f k p = unlessM (rsyncRetrieve o k f (Just p)) $ error "rsync failed" -retrieveCheap :: RsyncOpts -> Key -> FilePath -> Annex Bool -retrieveCheap o k f = ifM (preseedTmp k f) ( rsyncRetrieve o k f Nothing , return False ) +retrieveCheap :: RsyncOpts -> Key -> AssociatedFile -> FilePath -> Annex Bool +retrieveCheap o k _af f = ifM (preseedTmp k f) ( rsyncRetrieve o k f Nothing , return False ) remove :: RsyncOpts -> Remover remove o k = do diff --git a/Remote/S3.hs b/Remote/S3.hs index b86b17d56..21ab45674 100644 --- a/Remote/S3.hs +++ b/Remote/S3.hs @@ -238,8 +238,8 @@ retrieve h = fileRetriever $ \f k p -> liftIO $ runResourceT $ do S.hPut fh bs sinkprogressfile fh meterupdate sofar' -retrieveCheap :: Key -> FilePath -> Annex Bool -retrieveCheap _ _ = return False +retrieveCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool +retrieveCheap _ _ _ = return False {- Internet Archive doesn't easily allow removing content. - While it may remove the file, there are generally other files diff --git a/Remote/Tahoe.hs b/Remote/Tahoe.hs index bc4789e57..4a5216194 100644 --- a/Remote/Tahoe.hs +++ b/Remote/Tahoe.hs @@ -70,7 +70,7 @@ gen r u c gc = do , name = Git.repoDescribe r , storeKey = store u hdl , retrieveKeyFile = retrieve u hdl - , retrieveKeyFileCheap = \_ _ -> return False + , retrieveKeyFileCheap = \_ _ _ -> return False , removeKey = remove , checkPresent = checkKey u hdl , checkPresentCheap = False diff --git a/Remote/Web.hs b/Remote/Web.hs index a4a484ca3..102972b02 100644 --- a/Remote/Web.hs +++ b/Remote/Web.hs @@ -90,8 +90,8 @@ downloadKey key _file dest _p = get =<< getWebUrls key #endif _ -> downloadUrl [u'] dest -downloadKeyCheap :: Key -> FilePath -> Annex Bool -downloadKeyCheap _ _ = return False +downloadKeyCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool +downloadKeyCheap _ _ _ = return False uploadKey :: Key -> AssociatedFile -> MeterUpdate -> Annex Bool uploadKey _ _ _ = do diff --git a/Remote/WebDAV.hs b/Remote/WebDAV.hs index aaebecf41..3c414f003 100644 --- a/Remote/WebDAV.hs +++ b/Remote/WebDAV.hs @@ -116,8 +116,8 @@ finalizeStore baseurl tmp dest = do maybe noop (void . mkColRecursive) (locationParent dest) moveDAV baseurl tmp dest -retrieveCheap :: Key -> FilePath -> Annex Bool -retrieveCheap _ _ = return False +retrieveCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool +retrieveCheap _ _ _ = return False retrieve :: ChunkConfig -> Maybe DavHandle -> Retriever retrieve _ Nothing = error "unable to connect" diff --git a/Types/Messages.hs b/Types/Messages.hs index a437d86ef..c3696c015 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -9,7 +9,7 @@ module Types.Messages where import Data.Default -data OutputType = NormalOutput | QuietOutput | ProgressOutput | JSONOutput +data OutputType = NormalOutput | QuietOutput | ParallelOutput Int | JSONOutput data SideActionBlock = NoBlock | StartBlock | InBlock deriving (Eq) diff --git a/Types/Remote.hs b/Types/Remote.hs index ad8c0c4a7..4237f5e00 100644 --- a/Types/Remote.hs +++ b/Types/Remote.hs @@ -68,7 +68,7 @@ data RemoteA a = Remote { retrieveKeyFile :: Key -> AssociatedFile -> FilePath -> MeterUpdate -> a Bool, -- Retrieves a key's contents to a tmp file, if it can be done cheaply. -- It's ok to create a symlink or hardlink. - retrieveKeyFileCheap :: Key -> FilePath -> a Bool, + retrieveKeyFileCheap :: Key -> AssociatedFile -> FilePath -> a Bool, -- removes a key's contents (succeeds if the contents are not present) removeKey :: Key -> a Bool, -- Checks if a key is present in the remote. diff --git a/debian/changelog b/debian/changelog index 5057e9dab..53a59875d 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,6 @@ + * get, move, copy, mirror: Concurrent downloads and uploads are + now supported! For example: git-annex get -J10 + git-annex (5.20150508.2) UNRELEASED; urgency=medium * import: Refuse to import files that are within the work tree, as that diff --git a/doc/git-annex-copy.mdwn b/doc/git-annex-copy.mdwn index 25773a4fe..19d328c19 100644 --- a/doc/git-annex-copy.mdwn +++ b/doc/git-annex-copy.mdwn @@ -24,6 +24,11 @@ Copies the content of files from or to another remote. Use this option to copy the content of files from the local repository to the specified remote. +* `--jobs=N` `-JN` + + Enables parallel transfers with up to the specified number of jobs + running at once. For example: `-J10` + * `--auto` Rather than copying all files, only copy files that don't yet have diff --git a/doc/git-annex-get.mdwn b/doc/git-annex-get.mdwn index 7d4da59ef..da9c8c05a 100644 --- a/doc/git-annex-get.mdwn +++ b/doc/git-annex-get.mdwn @@ -27,6 +27,11 @@ or transferring them from some kind of key-value store. Any files that are not available on the remote will be silently skipped. +* `--jobs=N` `-JN` + + Enables parallel download with up to the specified number of jobs + running at once. For example: `-J10` + * `--all` Rather than specifying a filename or path to get, this option can be diff --git a/doc/git-annex-mirror.mdwn b/doc/git-annex-mirror.mdwn index 43e3971eb..68bce3d47 100644 --- a/doc/git-annex-mirror.mdwn +++ b/doc/git-annex-mirror.mdwn @@ -31,6 +31,11 @@ contents. Use the remote as the source repository, and mirror its contents to the local repository. +* `--jobs=N` `-JN` + + Enables parallel transfers with up to the specified number of jobs + running at once. For example: `-J10` + * `--all` Mirror all objects stored in the git annex, not only objects used by diff --git a/doc/git-annex-move.mdwn b/doc/git-annex-move.mdwn index 83e968c86..f1cdb0bec 100644 --- a/doc/git-annex-move.mdwn +++ b/doc/git-annex-move.mdwn @@ -22,6 +22,11 @@ Moves the content of files from or to another remote. Use this option to move the content of files from the local repository to the specified remote. +* `--jobs=N` `-JN` + + Enables parallel transfers with up to the specified number of jobs + running at once. For example: `-J10` + * `--all` Rather than specifying a filename or path to move, this option can be diff --git a/git-annex.cabal b/git-annex.cabal index fe4a547a4..a67cc8ea5 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -122,7 +122,8 @@ Executable git-annex monad-control, transformers, bloomfilter, edit-distance, resourcet, http-conduit, http-types, - esqueleto, persistent-sqlite, persistent, persistent-template + esqueleto, persistent-sqlite, persistent, persistent-template, + ascii-progress (<= 0.2.1.2), terminal-size CC-Options: -Wall GHC-Options: -Wall -fno-warn-tabs Extensions: PackageImports |