summaryrefslogtreecommitdiff
path: root/CmdLine/Action.hs
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2015-04-10 17:08:07 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2015-04-10 17:08:07 -0400
commitc7a5cc5cc31377e5fed1fc59bfdeb503784d060d (patch)
tree2acb62d5fef9a1b2a02cbc43fa8c81b7108c91f1 /CmdLine/Action.hs
parentb4dd6c2250efd959c2b0c0d032d9508ef55ea79a (diff)
get, move, copy, mirror: Concurrent downloads and uploads are now supported!
This works, and seems fairly robust. Clean get of 20 files at -J3. At -J10, there are some messages about ssh multiplexing, probably due to a race spinning up the ssh connection cacher. But, it manages to get all the files ok regardless. The progress bars are a scrambled mess though, due to bugs in ascii-progress, which I've already filed. Particularly this one: https://github.com/yamadapc/haskell-ascii-progress/issues/8
Diffstat (limited to 'CmdLine/Action.hs')
-rw-r--r--CmdLine/Action.hs102
1 files changed, 97 insertions, 5 deletions
diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs
index 57e1fa60b..7cc6b8406 100644
--- a/CmdLine/Action.hs
+++ b/CmdLine/Action.hs
@@ -1,6 +1,6 @@
{- git-annex command-line actions
-
- - Copyright 2010-2014 Joey Hess <id@joeyh.name>
+ - Copyright 2010-2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@@ -13,6 +13,13 @@ import Common.Annex
import qualified Annex
import Types.Command
import qualified Annex.Queue
+import Messages.Internal
+import Types.Messages
+
+import Control.Concurrent.Async
+import Control.Exception (throwIO)
+import qualified Data.Map as M
+import Data.Either
type CommandActionRunner = CommandStart -> CommandCleanup
@@ -24,6 +31,7 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa
mapM_ runCheck c
Annex.changeState $ \s -> s { Annex.errcounter = 0 }
seek params
+ finishCommandActions
cont
showerrcount =<< Annex.getState Annex.errcounter
where
@@ -35,9 +43,93 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa
- including by throwing IO errors (but other errors terminate the whole
- command).
-
- - This should only be run in the seek stage. -}
-commandAction :: CommandActionRunner
-commandAction a = account =<< tryIO go
+ - When concurrency is enabled, a thread is forked off to run the action
+ - in the background, as soon as a free slot is available.
+
+ - This should only be run in the seek stage.
+ -}
+commandAction :: CommandStart -> Annex ()
+commandAction a = withOutputType go
+ where
+ go (ParallelOutput n) = do
+ ws <- Annex.getState Annex.workers
+ (st, ws') <- if null ws
+ then do
+ st <- newWorkerState
+ return (st, replicate (n-1) (Left st))
+ else do
+ l <- liftIO $ drainTo (n-1) ws
+ findFreeSlot l
+ w <- liftIO $ async $ snd <$> Annex.run st run
+ Annex.changeState $ \s -> s { Annex.workers = Right w:ws' }
+ go _ = run
+ run = void $ includeCommandAction a
+
+{- Waits for any forked off command actions to finish.
+ -
+ - Merge together the cleanup actions of all the AnnexStates used by
+ - threads, into the current Annex's state, so they'll run at shutdown.
+ -
+ - Also merge together the errcounters of the AnnexStates.
+ -}
+finishCommandActions :: Annex ()
+finishCommandActions = do
+ l <- liftIO . drainTo 0 =<< Annex.getState Annex.workers
+ forM_ (lefts l) $ \st -> do
+ forM_ (M.toList $ Annex.cleanup st) $
+ uncurry Annex.addCleanup
+ Annex.changeState $ \s ->
+ s { Annex.errcounter = Annex.errcounter s + Annex.errcounter st }
+
+{- Wait for Asyncs from the list to finish, replacing them with their
+ - final AnnexStates, until the list of remaining Asyncs is not larger
+ - than the specified size, then returns the new list.
+ -
+ - If the action throws an exception, it is propigated, but first
+ - all other actions are waited for, to allow for a clean shutdown.
+ -}
+drainTo
+ :: Int
+ -> [Either Annex.AnnexState (Async Annex.AnnexState)]
+ -> IO [Either Annex.AnnexState (Async Annex.AnnexState)]
+drainTo sz l
+ | null as || sz >= length as = return l
+ | otherwise = do
+ (done, ret) <- waitAnyCatch as
+ let as' = filter (/= done) as
+ case ret of
+ Left e -> do
+ void $ drainTo 0 (map Left sts ++ map Right as')
+ throwIO e
+ Right st -> do
+ drainTo sz $ map Left (st:sts) ++ map Right as'
+ where
+ (sts, as) = partitionEithers l
+
+findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Annex.AnnexState, [Either Annex.AnnexState (Async Annex.AnnexState)])
+findFreeSlot = go []
+ where
+ go c [] = do
+ st <- newWorkerState
+ return (st, c)
+ go c (Left st:rest) = return (st, c ++ rest)
+ go c (v:rest) = go (v:c) rest
+
+{- From the current Annex state, get a state that is suitable for being
+ - used for a worker thread. Avoid sharing eg, open file handles. -}
+newWorkerState :: Annex Annex.AnnexState
+newWorkerState = do
+ st <- Annex.getState id
+ return $ st
+ { Annex.workers = []
+ , Annex.catfilehandles = M.empty
+ , Annex.checkattrhandle = Nothing
+ , Annex.checkignorehandle = Nothing
+ }
+
+{- Like commandAction, but without the concurrency. -}
+includeCommandAction :: CommandStart -> CommandCleanup
+includeCommandAction a = account =<< tryIO go
where
go = do
Annex.Queue.flushWhenFull
@@ -58,7 +150,7 @@ commandAction a = account =<< tryIO go
{- Runs a single command action through the start, perform and cleanup
- stages, without catching errors. Useful if one command wants to run
- part of another command. -}
-callCommandAction :: CommandActionRunner
+callCommandAction :: CommandStart -> CommandCleanup
callCommandAction = start
where
start = stage $ maybe skip perform