summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Annex.hs3
-rw-r--r--Annex/Drop.hs2
-rw-r--r--CmdLine/Action.hs102
-rw-r--r--CmdLine/GitAnnex/Options.hs8
-rw-r--r--CmdLine/Seek.hs2
-rw-r--r--Command/Get.hs2
-rw-r--r--Command/Mirror.hs2
-rw-r--r--Command/Move.hs2
-rw-r--r--Command/Sync.hs6
-rw-r--r--debian/changelog2
-rw-r--r--doc/git-annex-copy.mdwn5
-rw-r--r--doc/git-annex-get.mdwn5
-rw-r--r--doc/git-annex-mirror.mdwn5
-rw-r--r--doc/git-annex-move.mdwn5
14 files changed, 138 insertions, 13 deletions
diff --git a/Annex.hs b/Annex.hs
index 1c8618cc0..d3425c7e2 100644
--- a/Annex.hs
+++ b/Annex.hs
@@ -68,6 +68,7 @@ import Utility.Url
import "mtl" Control.Monad.Reader
import Control.Concurrent
+import Control.Concurrent.Async
import qualified Data.Map as M
import qualified Data.Set as S
@@ -133,6 +134,7 @@ data AnnexState = AnnexState
#endif
, existinghooks :: M.Map Git.Hook.Hook Bool
, desktopnotify :: DesktopNotify
+ , workers :: [Either AnnexState (Async AnnexState)]
}
newState :: GitConfig -> Git.Repo -> AnnexState
@@ -178,6 +180,7 @@ newState c r = AnnexState
#endif
, existinghooks = M.empty
, desktopnotify = mempty
+ , workers = []
}
{- Makes an Annex state object for the specified git repo.
diff --git a/Annex/Drop.hs b/Annex/Drop.hs
index 6f3b95615..0ea815db2 100644
--- a/Annex/Drop.hs
+++ b/Annex/Drop.hs
@@ -42,7 +42,7 @@ type Reason = String
- The runner is used to run commands, and so can be either callCommand
- or commandAction.
-}
-handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> CommandActionRunner -> Annex ()
+handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> (CommandStart -> CommandCleanup) -> Annex ()
handleDropsFrom locs rs reason fromhere key afile knownpresentremote runner = do
fs <- ifM isDirect
( do
diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs
index 57e1fa60b..7cc6b8406 100644
--- a/CmdLine/Action.hs
+++ b/CmdLine/Action.hs
@@ -1,6 +1,6 @@
{- git-annex command-line actions
-
- - Copyright 2010-2014 Joey Hess <id@joeyh.name>
+ - Copyright 2010-2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@@ -13,6 +13,13 @@ import Common.Annex
import qualified Annex
import Types.Command
import qualified Annex.Queue
+import Messages.Internal
+import Types.Messages
+
+import Control.Concurrent.Async
+import Control.Exception (throwIO)
+import qualified Data.Map as M
+import Data.Either
type CommandActionRunner = CommandStart -> CommandCleanup
@@ -24,6 +31,7 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa
mapM_ runCheck c
Annex.changeState $ \s -> s { Annex.errcounter = 0 }
seek params
+ finishCommandActions
cont
showerrcount =<< Annex.getState Annex.errcounter
where
@@ -35,9 +43,93 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa
- including by throwing IO errors (but other errors terminate the whole
- command).
-
- - This should only be run in the seek stage. -}
-commandAction :: CommandActionRunner
-commandAction a = account =<< tryIO go
+ - When concurrency is enabled, a thread is forked off to run the action
+ - in the background, as soon as a free slot is available.
+
+ - This should only be run in the seek stage.
+ -}
+commandAction :: CommandStart -> Annex ()
+commandAction a = withOutputType go
+ where
+ go (ParallelOutput n) = do
+ ws <- Annex.getState Annex.workers
+ (st, ws') <- if null ws
+ then do
+ st <- newWorkerState
+ return (st, replicate (n-1) (Left st))
+ else do
+ l <- liftIO $ drainTo (n-1) ws
+ findFreeSlot l
+ w <- liftIO $ async $ snd <$> Annex.run st run
+ Annex.changeState $ \s -> s { Annex.workers = Right w:ws' }
+ go _ = run
+ run = void $ includeCommandAction a
+
+{- Waits for any forked off command actions to finish.
+ -
+ - Merge together the cleanup actions of all the AnnexStates used by
+ - threads, into the current Annex's state, so they'll run at shutdown.
+ -
+ - Also merge together the errcounters of the AnnexStates.
+ -}
+finishCommandActions :: Annex ()
+finishCommandActions = do
+ l <- liftIO . drainTo 0 =<< Annex.getState Annex.workers
+ forM_ (lefts l) $ \st -> do
+ forM_ (M.toList $ Annex.cleanup st) $
+ uncurry Annex.addCleanup
+ Annex.changeState $ \s ->
+ s { Annex.errcounter = Annex.errcounter s + Annex.errcounter st }
+
+{- Wait for Asyncs from the list to finish, replacing them with their
+ - final AnnexStates, until the list of remaining Asyncs is not larger
+ - than the specified size, then returns the new list.
+ -
+ - If the action throws an exception, it is propigated, but first
+ - all other actions are waited for, to allow for a clean shutdown.
+ -}
+drainTo
+ :: Int
+ -> [Either Annex.AnnexState (Async Annex.AnnexState)]
+ -> IO [Either Annex.AnnexState (Async Annex.AnnexState)]
+drainTo sz l
+ | null as || sz >= length as = return l
+ | otherwise = do
+ (done, ret) <- waitAnyCatch as
+ let as' = filter (/= done) as
+ case ret of
+ Left e -> do
+ void $ drainTo 0 (map Left sts ++ map Right as')
+ throwIO e
+ Right st -> do
+ drainTo sz $ map Left (st:sts) ++ map Right as'
+ where
+ (sts, as) = partitionEithers l
+
+findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Annex.AnnexState, [Either Annex.AnnexState (Async Annex.AnnexState)])
+findFreeSlot = go []
+ where
+ go c [] = do
+ st <- newWorkerState
+ return (st, c)
+ go c (Left st:rest) = return (st, c ++ rest)
+ go c (v:rest) = go (v:c) rest
+
+{- From the current Annex state, get a state that is suitable for being
+ - used for a worker thread. Avoid sharing eg, open file handles. -}
+newWorkerState :: Annex Annex.AnnexState
+newWorkerState = do
+ st <- Annex.getState id
+ return $ st
+ { Annex.workers = []
+ , Annex.catfilehandles = M.empty
+ , Annex.checkattrhandle = Nothing
+ , Annex.checkignorehandle = Nothing
+ }
+
+{- Like commandAction, but without the concurrency. -}
+includeCommandAction :: CommandStart -> CommandCleanup
+includeCommandAction a = account =<< tryIO go
where
go = do
Annex.Queue.flushWhenFull
@@ -58,7 +150,7 @@ commandAction a = account =<< tryIO go
{- Runs a single command action through the start, perform and cleanup
- stages, without catching errors. Useful if one command wants to run
- part of another command. -}
-callCommandAction :: CommandActionRunner
+callCommandAction :: CommandStart -> CommandCleanup
callCommandAction = start
where
start = stage $ maybe skip perform
diff --git a/CmdLine/GitAnnex/Options.hs b/CmdLine/GitAnnex/Options.hs
index be1c74ede..38fa93090 100644
--- a/CmdLine/GitAnnex/Options.hs
+++ b/CmdLine/GitAnnex/Options.hs
@@ -138,6 +138,14 @@ jsonOption :: Option
jsonOption = Option ['j'] ["json"] (NoArg (Annex.setOutput JSONOutput))
"enable JSON output"
+jobsOption :: Option
+jobsOption = Option ['J'] ["jobs"] (ReqArg set paramNumber)
+ "enable concurrent jobs"
+ where
+ set s = case readish s of
+ Nothing -> error "Bad --jobs number"
+ Just n -> Annex.setOutput (ParallelOutput n)
+
timeLimitOption :: Option
timeLimitOption = Option ['T'] ["time-limit"]
(ReqArg Limit.addTimeLimit paramTime)
diff --git a/CmdLine/Seek.hs b/CmdLine/Seek.hs
index 1db075ec3..ea06fc976 100644
--- a/CmdLine/Seek.hs
+++ b/CmdLine/Seek.hs
@@ -84,7 +84,7 @@ withFilesInRefs a = mapM_ go
case v of
Nothing -> noop
Just k -> whenM (matcher $ MatchingKey k) $
- void $ commandAction $ a f k
+ commandAction $ a f k
withPathContents :: ((FilePath, FilePath) -> CommandStart) -> CommandSeek
withPathContents a params = do
diff --git a/Command/Get.hs b/Command/Get.hs
index 922aee06a..111c69e32 100644
--- a/Command/Get.hs
+++ b/Command/Get.hs
@@ -21,7 +21,7 @@ cmd = [withOptions getOptions $ command "get" paramPaths seek
SectionCommon "make content of annexed files available"]
getOptions :: [Option]
-getOptions = fromOption : annexedMatchingOptions ++ keyOptions ++ [autoOption]
+getOptions = fromOption : autoOption : jobsOption : annexedMatchingOptions ++ keyOptions
seek :: CommandSeek
seek ps = do
diff --git a/Command/Mirror.hs b/Command/Mirror.hs
index a04efb89b..6c3895be1 100644
--- a/Command/Mirror.hs
+++ b/Command/Mirror.hs
@@ -21,7 +21,7 @@ cmd = [withOptions mirrorOptions $ command "mirror" paramPaths seek
SectionCommon "mirror content of files to/from another repository"]
mirrorOptions :: [Option]
-mirrorOptions = fromToOptions ++ annexedMatchingOptions ++ keyOptions
+mirrorOptions = fromToOptions ++ [jobsOption] ++ annexedMatchingOptions ++ keyOptions
seek :: CommandSeek
seek ps = do
diff --git a/Command/Move.hs b/Command/Move.hs
index bdc8018ba..91f7c8ea7 100644
--- a/Command/Move.hs
+++ b/Command/Move.hs
@@ -22,7 +22,7 @@ cmd = [withOptions moveOptions $ command "move" paramPaths seek
SectionCommon "move content of files to/from another repository"]
moveOptions :: [Option]
-moveOptions = fromToOptions ++ keyOptions ++ annexedMatchingOptions
+moveOptions = fromToOptions ++ [jobsOption] ++ keyOptions ++ annexedMatchingOptions
seek :: CommandSeek
seek ps = do
diff --git a/Command/Sync.hs b/Command/Sync.hs
index 130693909..7c1ed0067 100644
--- a/Command/Sync.hs
+++ b/Command/Sync.hs
@@ -389,7 +389,7 @@ syncFile rs f k = do
u <- getUUID
let locs' = concat [[u | got], putrs, locs]
- -- Using callCommandAction rather than commandAction for drops,
+ -- Using callCommandAction rather than includeCommandAction for drops,
-- because a failure to drop does not mean the sync failed.
handleDropsFrom locs' rs "unwanted" True k (Just f)
Nothing callCommandAction
@@ -403,7 +403,7 @@ syncFile rs f k = do
( return [ get have ]
, return []
)
- get have = commandAction $ do
+ get have = includeCommandAction $ do
showStart "get" f
next $ next $ getViaTmp k $ \dest -> getKeyFile' k (Just f) dest have
@@ -415,7 +415,7 @@ syncFile rs f k = do
, return []
)
put dest = do
- ok <- commandAction $ do
+ ok <- includeCommandAction $ do
showStart "copy" f
Command.Move.toStart' dest False (Just f) k
return (ok, if ok then Just (Remote.uuid dest) else Nothing)
diff --git a/debian/changelog b/debian/changelog
index f4afbc461..19dade5de 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -4,6 +4,8 @@ git-annex (5.20150410) UNRELEASED; urgency=medium
activity from other uuids.
* Union merge could fall over if there was a file in the repository
with the same name as a git ref. Now fixed.
+ * get, move, copy, mirror: Concurrent downloads and uploads are
+ now supported! For example: git-annex get -J10
-- Joey Hess <id@joeyh.name> Thu, 09 Apr 2015 20:59:43 -0400
diff --git a/doc/git-annex-copy.mdwn b/doc/git-annex-copy.mdwn
index e61cd1281..ea0b89670 100644
--- a/doc/git-annex-copy.mdwn
+++ b/doc/git-annex-copy.mdwn
@@ -22,6 +22,11 @@ Copies the content of files from or to another remote.
Use this option to copy the content of files from the local repository
to the specified remote.
+* `--jobs=N` `-JN`
+
+ Enables parallel transfers with up to the specified number of jobs
+ running at once. For example: `-J10`
+
* `--auto`
Rather than copying all files, only copy files that don't yet have
diff --git a/doc/git-annex-get.mdwn b/doc/git-annex-get.mdwn
index a72c79912..516e5da85 100644
--- a/doc/git-annex-get.mdwn
+++ b/doc/git-annex-get.mdwn
@@ -25,6 +25,11 @@ or transferring them from some kind of key-value store.
Normally git-annex will choose which remotes to get the content
from. Use this option to specify which remote to use.
+* `--jobs=N` `-JN`
+
+ Enables parallel download with up to the specified number of jobs
+ running at once. For example: `-J10`
+
* `--all`
Rather than specifying a filename or path to get, this option can be
diff --git a/doc/git-annex-mirror.mdwn b/doc/git-annex-mirror.mdwn
index 43e3971eb..68bce3d47 100644
--- a/doc/git-annex-mirror.mdwn
+++ b/doc/git-annex-mirror.mdwn
@@ -31,6 +31,11 @@ contents.
Use the remote as the source repository, and mirror its contents to the local
repository.
+* `--jobs=N` `-JN`
+
+ Enables parallel transfers with up to the specified number of jobs
+ running at once. For example: `-J10`
+
* `--all`
Mirror all objects stored in the git annex, not only objects used by
diff --git a/doc/git-annex-move.mdwn b/doc/git-annex-move.mdwn
index 83e968c86..f1cdb0bec 100644
--- a/doc/git-annex-move.mdwn
+++ b/doc/git-annex-move.mdwn
@@ -22,6 +22,11 @@ Moves the content of files from or to another remote.
Use this option to move the content of files from the local repository
to the specified remote.
+* `--jobs=N` `-JN`
+
+ Enables parallel transfers with up to the specified number of jobs
+ running at once. For example: `-J10`
+
* `--all`
Rather than specifying a filename or path to move, this option can be