diff options
-rw-r--r-- | Annex.hs | 3 | ||||
-rw-r--r-- | Annex/Drop.hs | 2 | ||||
-rw-r--r-- | CmdLine/Action.hs | 102 | ||||
-rw-r--r-- | CmdLine/GitAnnex/Options.hs | 8 | ||||
-rw-r--r-- | CmdLine/Seek.hs | 2 | ||||
-rw-r--r-- | Command/Get.hs | 2 | ||||
-rw-r--r-- | Command/Mirror.hs | 2 | ||||
-rw-r--r-- | Command/Move.hs | 2 | ||||
-rw-r--r-- | Command/Sync.hs | 6 | ||||
-rw-r--r-- | debian/changelog | 2 | ||||
-rw-r--r-- | doc/git-annex-copy.mdwn | 5 | ||||
-rw-r--r-- | doc/git-annex-get.mdwn | 5 | ||||
-rw-r--r-- | doc/git-annex-mirror.mdwn | 5 | ||||
-rw-r--r-- | doc/git-annex-move.mdwn | 5 |
14 files changed, 138 insertions, 13 deletions
@@ -68,6 +68,7 @@ import Utility.Url import "mtl" Control.Monad.Reader import Control.Concurrent +import Control.Concurrent.Async import qualified Data.Map as M import qualified Data.Set as S @@ -133,6 +134,7 @@ data AnnexState = AnnexState #endif , existinghooks :: M.Map Git.Hook.Hook Bool , desktopnotify :: DesktopNotify + , workers :: [Either AnnexState (Async AnnexState)] } newState :: GitConfig -> Git.Repo -> AnnexState @@ -178,6 +180,7 @@ newState c r = AnnexState #endif , existinghooks = M.empty , desktopnotify = mempty + , workers = [] } {- Makes an Annex state object for the specified git repo. diff --git a/Annex/Drop.hs b/Annex/Drop.hs index 6f3b95615..0ea815db2 100644 --- a/Annex/Drop.hs +++ b/Annex/Drop.hs @@ -42,7 +42,7 @@ type Reason = String - The runner is used to run commands, and so can be either callCommand - or commandAction. -} -handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> CommandActionRunner -> Annex () +handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> (CommandStart -> CommandCleanup) -> Annex () handleDropsFrom locs rs reason fromhere key afile knownpresentremote runner = do fs <- ifM isDirect ( do diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index 57e1fa60b..7cc6b8406 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -1,6 +1,6 @@ {- git-annex command-line actions - - - Copyright 2010-2014 Joey Hess <id@joeyh.name> + - Copyright 2010-2015 Joey Hess <id@joeyh.name> - - Licensed under the GNU GPL version 3 or higher. -} @@ -13,6 +13,13 @@ import Common.Annex import qualified Annex import Types.Command import qualified Annex.Queue +import Messages.Internal +import Types.Messages + +import Control.Concurrent.Async +import Control.Exception (throwIO) +import qualified Data.Map as M +import Data.Either type CommandActionRunner = CommandStart -> CommandCleanup @@ -24,6 +31,7 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa mapM_ runCheck c Annex.changeState $ \s -> s { Annex.errcounter = 0 } seek params + finishCommandActions cont showerrcount =<< Annex.getState Annex.errcounter where @@ -35,9 +43,93 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa - including by throwing IO errors (but other errors terminate the whole - command). - - - This should only be run in the seek stage. -} -commandAction :: CommandActionRunner -commandAction a = account =<< tryIO go + - When concurrency is enabled, a thread is forked off to run the action + - in the background, as soon as a free slot is available. + + - This should only be run in the seek stage. + -} +commandAction :: CommandStart -> Annex () +commandAction a = withOutputType go + where + go (ParallelOutput n) = do + ws <- Annex.getState Annex.workers + (st, ws') <- if null ws + then do + st <- newWorkerState + return (st, replicate (n-1) (Left st)) + else do + l <- liftIO $ drainTo (n-1) ws + findFreeSlot l + w <- liftIO $ async $ snd <$> Annex.run st run + Annex.changeState $ \s -> s { Annex.workers = Right w:ws' } + go _ = run + run = void $ includeCommandAction a + +{- Waits for any forked off command actions to finish. + - + - Merge together the cleanup actions of all the AnnexStates used by + - threads, into the current Annex's state, so they'll run at shutdown. + - + - Also merge together the errcounters of the AnnexStates. + -} +finishCommandActions :: Annex () +finishCommandActions = do + l <- liftIO . drainTo 0 =<< Annex.getState Annex.workers + forM_ (lefts l) $ \st -> do + forM_ (M.toList $ Annex.cleanup st) $ + uncurry Annex.addCleanup + Annex.changeState $ \s -> + s { Annex.errcounter = Annex.errcounter s + Annex.errcounter st } + +{- Wait for Asyncs from the list to finish, replacing them with their + - final AnnexStates, until the list of remaining Asyncs is not larger + - than the specified size, then returns the new list. + - + - If the action throws an exception, it is propigated, but first + - all other actions are waited for, to allow for a clean shutdown. + -} +drainTo + :: Int + -> [Either Annex.AnnexState (Async Annex.AnnexState)] + -> IO [Either Annex.AnnexState (Async Annex.AnnexState)] +drainTo sz l + | null as || sz >= length as = return l + | otherwise = do + (done, ret) <- waitAnyCatch as + let as' = filter (/= done) as + case ret of + Left e -> do + void $ drainTo 0 (map Left sts ++ map Right as') + throwIO e + Right st -> do + drainTo sz $ map Left (st:sts) ++ map Right as' + where + (sts, as) = partitionEithers l + +findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Annex.AnnexState, [Either Annex.AnnexState (Async Annex.AnnexState)]) +findFreeSlot = go [] + where + go c [] = do + st <- newWorkerState + return (st, c) + go c (Left st:rest) = return (st, c ++ rest) + go c (v:rest) = go (v:c) rest + +{- From the current Annex state, get a state that is suitable for being + - used for a worker thread. Avoid sharing eg, open file handles. -} +newWorkerState :: Annex Annex.AnnexState +newWorkerState = do + st <- Annex.getState id + return $ st + { Annex.workers = [] + , Annex.catfilehandles = M.empty + , Annex.checkattrhandle = Nothing + , Annex.checkignorehandle = Nothing + } + +{- Like commandAction, but without the concurrency. -} +includeCommandAction :: CommandStart -> CommandCleanup +includeCommandAction a = account =<< tryIO go where go = do Annex.Queue.flushWhenFull @@ -58,7 +150,7 @@ commandAction a = account =<< tryIO go {- Runs a single command action through the start, perform and cleanup - stages, without catching errors. Useful if one command wants to run - part of another command. -} -callCommandAction :: CommandActionRunner +callCommandAction :: CommandStart -> CommandCleanup callCommandAction = start where start = stage $ maybe skip perform diff --git a/CmdLine/GitAnnex/Options.hs b/CmdLine/GitAnnex/Options.hs index be1c74ede..38fa93090 100644 --- a/CmdLine/GitAnnex/Options.hs +++ b/CmdLine/GitAnnex/Options.hs @@ -138,6 +138,14 @@ jsonOption :: Option jsonOption = Option ['j'] ["json"] (NoArg (Annex.setOutput JSONOutput)) "enable JSON output" +jobsOption :: Option +jobsOption = Option ['J'] ["jobs"] (ReqArg set paramNumber) + "enable concurrent jobs" + where + set s = case readish s of + Nothing -> error "Bad --jobs number" + Just n -> Annex.setOutput (ParallelOutput n) + timeLimitOption :: Option timeLimitOption = Option ['T'] ["time-limit"] (ReqArg Limit.addTimeLimit paramTime) diff --git a/CmdLine/Seek.hs b/CmdLine/Seek.hs index 1db075ec3..ea06fc976 100644 --- a/CmdLine/Seek.hs +++ b/CmdLine/Seek.hs @@ -84,7 +84,7 @@ withFilesInRefs a = mapM_ go case v of Nothing -> noop Just k -> whenM (matcher $ MatchingKey k) $ - void $ commandAction $ a f k + commandAction $ a f k withPathContents :: ((FilePath, FilePath) -> CommandStart) -> CommandSeek withPathContents a params = do diff --git a/Command/Get.hs b/Command/Get.hs index 922aee06a..111c69e32 100644 --- a/Command/Get.hs +++ b/Command/Get.hs @@ -21,7 +21,7 @@ cmd = [withOptions getOptions $ command "get" paramPaths seek SectionCommon "make content of annexed files available"] getOptions :: [Option] -getOptions = fromOption : annexedMatchingOptions ++ keyOptions ++ [autoOption] +getOptions = fromOption : autoOption : jobsOption : annexedMatchingOptions ++ keyOptions seek :: CommandSeek seek ps = do diff --git a/Command/Mirror.hs b/Command/Mirror.hs index a04efb89b..6c3895be1 100644 --- a/Command/Mirror.hs +++ b/Command/Mirror.hs @@ -21,7 +21,7 @@ cmd = [withOptions mirrorOptions $ command "mirror" paramPaths seek SectionCommon "mirror content of files to/from another repository"] mirrorOptions :: [Option] -mirrorOptions = fromToOptions ++ annexedMatchingOptions ++ keyOptions +mirrorOptions = fromToOptions ++ [jobsOption] ++ annexedMatchingOptions ++ keyOptions seek :: CommandSeek seek ps = do diff --git a/Command/Move.hs b/Command/Move.hs index bdc8018ba..91f7c8ea7 100644 --- a/Command/Move.hs +++ b/Command/Move.hs @@ -22,7 +22,7 @@ cmd = [withOptions moveOptions $ command "move" paramPaths seek SectionCommon "move content of files to/from another repository"] moveOptions :: [Option] -moveOptions = fromToOptions ++ keyOptions ++ annexedMatchingOptions +moveOptions = fromToOptions ++ [jobsOption] ++ keyOptions ++ annexedMatchingOptions seek :: CommandSeek seek ps = do diff --git a/Command/Sync.hs b/Command/Sync.hs index 130693909..7c1ed0067 100644 --- a/Command/Sync.hs +++ b/Command/Sync.hs @@ -389,7 +389,7 @@ syncFile rs f k = do u <- getUUID let locs' = concat [[u | got], putrs, locs] - -- Using callCommandAction rather than commandAction for drops, + -- Using callCommandAction rather than includeCommandAction for drops, -- because a failure to drop does not mean the sync failed. handleDropsFrom locs' rs "unwanted" True k (Just f) Nothing callCommandAction @@ -403,7 +403,7 @@ syncFile rs f k = do ( return [ get have ] , return [] ) - get have = commandAction $ do + get have = includeCommandAction $ do showStart "get" f next $ next $ getViaTmp k $ \dest -> getKeyFile' k (Just f) dest have @@ -415,7 +415,7 @@ syncFile rs f k = do , return [] ) put dest = do - ok <- commandAction $ do + ok <- includeCommandAction $ do showStart "copy" f Command.Move.toStart' dest False (Just f) k return (ok, if ok then Just (Remote.uuid dest) else Nothing) diff --git a/debian/changelog b/debian/changelog index f4afbc461..19dade5de 100644 --- a/debian/changelog +++ b/debian/changelog @@ -4,6 +4,8 @@ git-annex (5.20150410) UNRELEASED; urgency=medium activity from other uuids. * Union merge could fall over if there was a file in the repository with the same name as a git ref. Now fixed. + * get, move, copy, mirror: Concurrent downloads and uploads are + now supported! For example: git-annex get -J10 -- Joey Hess <id@joeyh.name> Thu, 09 Apr 2015 20:59:43 -0400 diff --git a/doc/git-annex-copy.mdwn b/doc/git-annex-copy.mdwn index e61cd1281..ea0b89670 100644 --- a/doc/git-annex-copy.mdwn +++ b/doc/git-annex-copy.mdwn @@ -22,6 +22,11 @@ Copies the content of files from or to another remote. Use this option to copy the content of files from the local repository to the specified remote. +* `--jobs=N` `-JN` + + Enables parallel transfers with up to the specified number of jobs + running at once. For example: `-J10` + * `--auto` Rather than copying all files, only copy files that don't yet have diff --git a/doc/git-annex-get.mdwn b/doc/git-annex-get.mdwn index a72c79912..516e5da85 100644 --- a/doc/git-annex-get.mdwn +++ b/doc/git-annex-get.mdwn @@ -25,6 +25,11 @@ or transferring them from some kind of key-value store. Normally git-annex will choose which remotes to get the content from. Use this option to specify which remote to use. +* `--jobs=N` `-JN` + + Enables parallel download with up to the specified number of jobs + running at once. For example: `-J10` + * `--all` Rather than specifying a filename or path to get, this option can be diff --git a/doc/git-annex-mirror.mdwn b/doc/git-annex-mirror.mdwn index 43e3971eb..68bce3d47 100644 --- a/doc/git-annex-mirror.mdwn +++ b/doc/git-annex-mirror.mdwn @@ -31,6 +31,11 @@ contents. Use the remote as the source repository, and mirror its contents to the local repository. +* `--jobs=N` `-JN` + + Enables parallel transfers with up to the specified number of jobs + running at once. For example: `-J10` + * `--all` Mirror all objects stored in the git annex, not only objects used by diff --git a/doc/git-annex-move.mdwn b/doc/git-annex-move.mdwn index 83e968c86..f1cdb0bec 100644 --- a/doc/git-annex-move.mdwn +++ b/doc/git-annex-move.mdwn @@ -22,6 +22,11 @@ Moves the content of files from or to another remote. Use this option to move the content of files from the local repository to the specified remote. +* `--jobs=N` `-JN` + + Enables parallel transfers with up to the specified number of jobs + running at once. For example: `-J10` + * `--all` Rather than specifying a filename or path to move, this option can be |