diff options
author | Joey Hess <joeyh@joeyh.name> | 2015-04-10 17:08:07 -0400 |
---|---|---|
committer | Joey Hess <joeyh@joeyh.name> | 2015-04-10 17:08:07 -0400 |
commit | c7a5cc5cc31377e5fed1fc59bfdeb503784d060d (patch) | |
tree | 2acb62d5fef9a1b2a02cbc43fa8c81b7108c91f1 /CmdLine | |
parent | b4dd6c2250efd959c2b0c0d032d9508ef55ea79a (diff) |
get, move, copy, mirror: Concurrent downloads and uploads are now supported!
This works, and seems fairly robust. Clean get of 20 files at -J3. At -J10,
there are some messages about ssh multiplexing, probably due to a race
spinning up the ssh connection cacher. But, it manages to get all the files
ok regardless.
The progress bars are a scrambled mess though, due to bugs in
ascii-progress, which I've already filed. Particularly this one:
https://github.com/yamadapc/haskell-ascii-progress/issues/8
Diffstat (limited to 'CmdLine')
-rw-r--r-- | CmdLine/Action.hs | 102 | ||||
-rw-r--r-- | CmdLine/GitAnnex/Options.hs | 8 | ||||
-rw-r--r-- | CmdLine/Seek.hs | 2 |
3 files changed, 106 insertions, 6 deletions
diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index 57e1fa60b..7cc6b8406 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -1,6 +1,6 @@ {- git-annex command-line actions - - - Copyright 2010-2014 Joey Hess <id@joeyh.name> + - Copyright 2010-2015 Joey Hess <id@joeyh.name> - - Licensed under the GNU GPL version 3 or higher. -} @@ -13,6 +13,13 @@ import Common.Annex import qualified Annex import Types.Command import qualified Annex.Queue +import Messages.Internal +import Types.Messages + +import Control.Concurrent.Async +import Control.Exception (throwIO) +import qualified Data.Map as M +import Data.Either type CommandActionRunner = CommandStart -> CommandCleanup @@ -24,6 +31,7 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa mapM_ runCheck c Annex.changeState $ \s -> s { Annex.errcounter = 0 } seek params + finishCommandActions cont showerrcount =<< Annex.getState Annex.errcounter where @@ -35,9 +43,93 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa - including by throwing IO errors (but other errors terminate the whole - command). - - - This should only be run in the seek stage. -} -commandAction :: CommandActionRunner -commandAction a = account =<< tryIO go + - When concurrency is enabled, a thread is forked off to run the action + - in the background, as soon as a free slot is available. + + - This should only be run in the seek stage. + -} +commandAction :: CommandStart -> Annex () +commandAction a = withOutputType go + where + go (ParallelOutput n) = do + ws <- Annex.getState Annex.workers + (st, ws') <- if null ws + then do + st <- newWorkerState + return (st, replicate (n-1) (Left st)) + else do + l <- liftIO $ drainTo (n-1) ws + findFreeSlot l + w <- liftIO $ async $ snd <$> Annex.run st run + Annex.changeState $ \s -> s { Annex.workers = Right w:ws' } + go _ = run + run = void $ includeCommandAction a + +{- Waits for any forked off command actions to finish. + - + - Merge together the cleanup actions of all the AnnexStates used by + - threads, into the current Annex's state, so they'll run at shutdown. + - + - Also merge together the errcounters of the AnnexStates. + -} +finishCommandActions :: Annex () +finishCommandActions = do + l <- liftIO . drainTo 0 =<< Annex.getState Annex.workers + forM_ (lefts l) $ \st -> do + forM_ (M.toList $ Annex.cleanup st) $ + uncurry Annex.addCleanup + Annex.changeState $ \s -> + s { Annex.errcounter = Annex.errcounter s + Annex.errcounter st } + +{- Wait for Asyncs from the list to finish, replacing them with their + - final AnnexStates, until the list of remaining Asyncs is not larger + - than the specified size, then returns the new list. + - + - If the action throws an exception, it is propigated, but first + - all other actions are waited for, to allow for a clean shutdown. + -} +drainTo + :: Int + -> [Either Annex.AnnexState (Async Annex.AnnexState)] + -> IO [Either Annex.AnnexState (Async Annex.AnnexState)] +drainTo sz l + | null as || sz >= length as = return l + | otherwise = do + (done, ret) <- waitAnyCatch as + let as' = filter (/= done) as + case ret of + Left e -> do + void $ drainTo 0 (map Left sts ++ map Right as') + throwIO e + Right st -> do + drainTo sz $ map Left (st:sts) ++ map Right as' + where + (sts, as) = partitionEithers l + +findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Annex.AnnexState, [Either Annex.AnnexState (Async Annex.AnnexState)]) +findFreeSlot = go [] + where + go c [] = do + st <- newWorkerState + return (st, c) + go c (Left st:rest) = return (st, c ++ rest) + go c (v:rest) = go (v:c) rest + +{- From the current Annex state, get a state that is suitable for being + - used for a worker thread. Avoid sharing eg, open file handles. -} +newWorkerState :: Annex Annex.AnnexState +newWorkerState = do + st <- Annex.getState id + return $ st + { Annex.workers = [] + , Annex.catfilehandles = M.empty + , Annex.checkattrhandle = Nothing + , Annex.checkignorehandle = Nothing + } + +{- Like commandAction, but without the concurrency. -} +includeCommandAction :: CommandStart -> CommandCleanup +includeCommandAction a = account =<< tryIO go where go = do Annex.Queue.flushWhenFull @@ -58,7 +150,7 @@ commandAction a = account =<< tryIO go {- Runs a single command action through the start, perform and cleanup - stages, without catching errors. Useful if one command wants to run - part of another command. -} -callCommandAction :: CommandActionRunner +callCommandAction :: CommandStart -> CommandCleanup callCommandAction = start where start = stage $ maybe skip perform diff --git a/CmdLine/GitAnnex/Options.hs b/CmdLine/GitAnnex/Options.hs index be1c74ede..38fa93090 100644 --- a/CmdLine/GitAnnex/Options.hs +++ b/CmdLine/GitAnnex/Options.hs @@ -138,6 +138,14 @@ jsonOption :: Option jsonOption = Option ['j'] ["json"] (NoArg (Annex.setOutput JSONOutput)) "enable JSON output" +jobsOption :: Option +jobsOption = Option ['J'] ["jobs"] (ReqArg set paramNumber) + "enable concurrent jobs" + where + set s = case readish s of + Nothing -> error "Bad --jobs number" + Just n -> Annex.setOutput (ParallelOutput n) + timeLimitOption :: Option timeLimitOption = Option ['T'] ["time-limit"] (ReqArg Limit.addTimeLimit paramTime) diff --git a/CmdLine/Seek.hs b/CmdLine/Seek.hs index 1db075ec3..ea06fc976 100644 --- a/CmdLine/Seek.hs +++ b/CmdLine/Seek.hs @@ -84,7 +84,7 @@ withFilesInRefs a = mapM_ go case v of Nothing -> noop Just k -> whenM (matcher $ MatchingKey k) $ - void $ commandAction $ a f k + commandAction $ a f k withPathContents :: ((FilePath, FilePath) -> CommandStart) -> CommandSeek withPathContents a params = do |