diff options
-rw-r--r-- | Annex.hs | 5 | ||||
-rw-r--r-- | Annex/Transfer.hs | 7 | ||||
-rw-r--r-- | CmdLine/Action.hs | 27 | ||||
-rw-r--r-- | CmdLine/GitAnnex/Options.hs | 3 | ||||
-rw-r--r-- | Command/MetaData.hs | 2 | ||||
-rw-r--r-- | Messages.hs | 26 | ||||
-rw-r--r-- | Messages/Concurrent.hs | 33 | ||||
-rw-r--r-- | Messages/Internal.hs | 27 | ||||
-rw-r--r-- | Messages/Progress.hs | 37 | ||||
-rw-r--r-- | Types/Concurrency.hs | 8 | ||||
-rw-r--r-- | Types/Messages.hs | 4 | ||||
-rw-r--r-- | git-annex.cabal | 1 |
12 files changed, 96 insertions, 84 deletions
@@ -56,6 +56,7 @@ import Types.BranchState import Types.TrustLevel import Types.Group import Types.Messages +import Types.Concurrency import Types.UUID import Types.FileMatcher import Types.NumCopies @@ -101,6 +102,7 @@ data AnnexState = AnnexState , remotes :: [Types.Remote.RemoteA Annex] , remoteannexstate :: M.Map UUID AnnexState , output :: MessageState + , concurrency :: Concurrency , force :: Bool , fast :: Bool , daemon :: Bool @@ -134,7 +136,6 @@ data AnnexState = AnnexState , existinghooks :: M.Map Git.Hook.Hook Bool , desktopnotify :: DesktopNotify , workers :: [Either AnnexState (Async AnnexState)] - , concurrentjobs :: Maybe Int , activeremotes :: MVar (S.Set (Types.Remote.RemoteA Annex)) , keysdbhandle :: Maybe Keys.DbHandle , cachedcurrentbranch :: Maybe Git.Branch @@ -151,6 +152,7 @@ newState c r = do , remotes = [] , remoteannexstate = M.empty , output = def + , concurrency = NonConcurrent , force = False , fast = False , daemon = False @@ -184,7 +186,6 @@ newState c r = do , existinghooks = M.empty , desktopnotify = mempty , workers = [] - , concurrentjobs = Nothing , activeremotes = emptyactiveremotes , keysdbhandle = Nothing , cachedcurrentbranch = Nothing diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index 6ed8ca761..323600e96 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -28,6 +28,7 @@ import Utility.Metered import Annex.LockPool import Types.Remote (Verification(..)) import qualified Types.Remote as Remote +import Types.Concurrency import Control.Concurrent import qualified Data.Set as S @@ -180,11 +181,11 @@ forwardRetry old new = bytesComplete old < bytesComplete new - increase total transfer speed. -} pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v -pickRemote l a = go l =<< Annex.getState Annex.concurrentjobs +pickRemote l a = go l =<< Annex.getState Annex.concurrency where go [] _ = return observeFailure go (r:[]) _ = a r - go rs (Just n) | n > 1 = do + go rs (Concurrent n) | n > 1 = do mv <- Annex.getState Annex.activeremotes active <- liftIO $ takeMVar mv let rs' = sortBy (inactiveFirst active) rs @@ -193,7 +194,7 @@ pickRemote l a = go l =<< Annex.getState Annex.concurrentjobs ok <- a r if observeBool ok then return ok - else go rs Nothing + else go rs NonConcurrent goconcurrent mv active [] = do liftIO $ putMVar mv active return observeFailure diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index 9b7cf7ecd..7d9dce574 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -13,6 +13,7 @@ import Annex.Common import qualified Annex import Annex.Concurrent import Types.Command +import Types.Concurrency import Messages.Concurrent import Types.Messages @@ -50,9 +51,9 @@ performCommandAction Command { cmdcheck = c, cmdname = name } seek cont = do - This should only be run in the seek stage. -} commandAction :: CommandStart -> Annex () -commandAction a = withOutputType go +commandAction a = go =<< Annex.getState Annex.concurrency where - go o@(ConcurrentOutput n _) = do + go (Concurrent n) = do ws <- Annex.getState Annex.workers (st, ws') <- if null ws then do @@ -62,9 +63,9 @@ commandAction a = withOutputType go l <- liftIO $ drainTo (n-1) ws findFreeSlot l w <- liftIO $ async - $ snd <$> Annex.run st (inOwnConsoleRegion o run) + $ snd <$> Annex.run st (inOwnConsoleRegion (Annex.output st) run) Annex.changeState $ \s -> s { Annex.workers = Right w:ws' } - go _ = run + go NonConcurrent = run run = void $ includeCommandAction a {- Waits for any forked off command actions to finish. @@ -151,19 +152,21 @@ callCommandAction' = start {- Do concurrent output when that has been requested. -} allowConcurrentOutput :: Annex a -> Annex a #ifdef WITH_CONCURRENTOUTPUT -allowConcurrentOutput a = go =<< Annex.getState Annex.concurrentjobs +allowConcurrentOutput a = go =<< Annex.getState Annex.concurrency where - go Nothing = a - go (Just n) = ifM (liftIO concurrentOutputSupported) + go NonConcurrent = a + go (Concurrent _) = ifM (liftIO concurrentOutputSupported) ( Regions.displayConsoleRegions $ - goconcurrent (ConcurrentOutput n True) - , goconcurrent (ConcurrentOutput n False) + goconcurrent True + , goconcurrent False ) - goconcurrent o = bracket_ (setup o) cleanup a - setup = Annex.setOutput + goconcurrent b = bracket_ (setup b) cleanup a + setup = setconcurrentenabled cleanup = do finishCommandActions - Annex.setOutput NormalOutput + setconcurrentenabled False + setconcurrentenabled b = Annex.changeState $ \s -> + s { Annex.output = (Annex.output s) { concurrentOutputEnabled = b } } #else allowConcurrentOutput = id #endif diff --git a/CmdLine/GitAnnex/Options.hs b/CmdLine/GitAnnex/Options.hs index 64f70d178..1c360de19 100644 --- a/CmdLine/GitAnnex/Options.hs +++ b/CmdLine/GitAnnex/Options.hs @@ -21,6 +21,7 @@ import Types.Messages import Types.Command import Types.DeferredParse import Types.DesktopNotify +import Types.Concurrency import qualified Annex import qualified Remote import qualified Limit @@ -302,7 +303,7 @@ jobsOption = globalSetter set $ ) where set n = do - Annex.changeState $ \s -> s { Annex.concurrentjobs = Just n } + Annex.changeState $ \s -> s { Annex.concurrency = Concurrent n } c <- liftIO getNumCapabilities when (n > c) $ liftIO $ setNumCapabilities n diff --git a/Command/MetaData.hs b/Command/MetaData.hs index e3cf921cb..bf71f7b4f 100644 --- a/Command/MetaData.hs +++ b/Command/MetaData.hs @@ -78,7 +78,7 @@ seek o = do (startKeys now o) (seeker $ whenAnnexed $ start now o) (forFiles o) - Batch -> withOutputType $ \ot -> case ot of + Batch -> withMessageState $ \s -> case outputType s of JSONOutput -> batchInput parseJSONInput $ commandAction . startBatch now _ -> error "--batch is currently only supported in --json mode" diff --git a/Messages.hs b/Messages.hs index f1055efb8..61702530f 100644 --- a/Messages.hs +++ b/Messages.hs @@ -40,7 +40,7 @@ module Messages ( commandProgressDisabled, outputMessage, implicitMessage, - withOutputType, + withMessageState, ) where import System.Log.Logger @@ -155,17 +155,15 @@ indent = intercalate "\n" . map (\l -> " " ++ l) . lines {- Shows a JSON chunk only when in json mode. -} maybeShowJSON :: JSONChunk v -> Annex () -maybeShowJSON v = withOutputType $ liftIO . go - where - go JSONOutput = JSON.add v - go _ = return () +maybeShowJSON v = withMessageState $ \s -> case outputType s of + JSONOutput -> liftIO $ JSON.add v + _ -> return () {- Shows a complete JSON value, only when in json mode. -} showFullJSON :: JSONChunk v -> Annex Bool -showFullJSON v = withOutputType $ liftIO . go - where - go JSONOutput = JSON.complete v >> return True - go _ = return False +showFullJSON v = withMessageState $ \s -> case outputType s of + JSONOutput -> liftIO $ JSON.complete v >> return True + _ -> return False {- Performs an action that outputs nonstandard/customized output, and - in JSON mode wraps its output in JSON.start and JSON.end, so it's @@ -216,11 +214,11 @@ debugEnabled = do {- Should commands that normally output progress messages have that - output disabled? -} commandProgressDisabled :: Annex Bool -commandProgressDisabled = withOutputType $ \t -> return $ case t of - QuietOutput -> True - JSONOutput -> True - NormalOutput -> False - ConcurrentOutput {} -> True +commandProgressDisabled = withMessageState $ \s -> return $ + case outputType s of + QuietOutput -> True + JSONOutput -> True + NormalOutput -> concurrentOutputEnabled s {- Use to show a message that is displayed implicitly, and so might be - disabled when running a certian command that needs more control over its diff --git a/Messages/Concurrent.hs b/Messages/Concurrent.hs index 91b840231..41153d067 100644 --- a/Messages/Concurrent.hs +++ b/Messages/Concurrent.hs @@ -31,13 +31,13 @@ import GHC.IO.Encoding - When built without concurrent-output support, the fallback action is run - instead. -} -concurrentMessage :: OutputType -> Bool -> String -> Annex () -> Annex () +concurrentMessage :: MessageState -> Bool -> String -> Annex () -> Annex () #ifdef WITH_CONCURRENTOUTPUT -concurrentMessage o iserror msg fallback - | concurrentOutputEnabled o = +concurrentMessage s iserror msg fallback + | concurrentOutputEnabled s = go =<< consoleRegion <$> Annex.getState Annex.output #else -concurrentMessage _o _iserror _msg fallback +concurrentMessage _s _iserror _msg fallback #endif | otherwise = fallback #ifdef WITH_CONCURRENTOUTPUT @@ -50,8 +50,8 @@ concurrentMessage _o _iserror _msg fallback -- console regions are in use, so set the errflag -- to get it to display to stderr later. when iserror $ do - Annex.changeState $ \s -> - s { Annex.output = (Annex.output s) { consoleRegionErrFlag = True } } + Annex.changeState $ \st -> + st { Annex.output = (Annex.output st) { consoleRegionErrFlag = True } } liftIO $ atomically $ do Regions.appendConsoleRegion r msg rl <- takeTMVar Regions.regionList @@ -68,24 +68,24 @@ concurrentMessage _o _iserror _msg fallback - When not at a console, a region is not displayed until the action is - complete. -} -inOwnConsoleRegion :: OutputType -> Annex a -> Annex a +inOwnConsoleRegion :: MessageState -> Annex a -> Annex a #ifdef WITH_CONCURRENTOUTPUT -inOwnConsoleRegion o a - | concurrentOutputEnabled o = do +inOwnConsoleRegion s a + | concurrentOutputEnabled s = do r <- mkregion setregion (Just r) eret <- tryNonAsync a `onException` rmregion r case eret of Left e -> do -- Add error message to region before it closes. - concurrentMessage o True (show e) noop + concurrentMessage s True (show e) noop rmregion r throwM e Right ret -> do rmregion r return ret #else -inOwnConsoleRegion _o a +inOwnConsoleRegion _s a #endif | otherwise = a #ifdef WITH_CONCURRENTOUTPUT @@ -94,12 +94,13 @@ inOwnConsoleRegion _o a -- a message is added to it. This avoids unnecessary screen -- updates when a region does not turn out to need to be used. mkregion = Regions.newConsoleRegion Regions.Linear "" - setregion r = Annex.changeState $ \s -> s { Annex.output = (Annex.output s) { consoleRegion = r } } + setregion r = Annex.changeState $ \st -> st + { Annex.output = (Annex.output st) { consoleRegion = r } } rmregion r = do errflag <- consoleRegionErrFlag <$> Annex.getState Annex.output let h = if errflag then Console.StdErr else Console.StdOut - Annex.changeState $ \s -> - s { Annex.output = (Annex.output s) { consoleRegionErrFlag = False } } + Annex.changeState $ \st -> st + { Annex.output = (Annex.output st) { consoleRegionErrFlag = False } } setregion Nothing liftIO $ atomically $ do t <- Regions.getConsoleRegion r @@ -135,7 +136,3 @@ concurrentOutputSupported = return True -- Windows is always unicode #else concurrentOutputSupported = return False #endif - -concurrentOutputEnabled :: OutputType -> Bool -concurrentOutputEnabled (ConcurrentOutput _ b) = b -concurrentOutputEnabled _ = False diff --git a/Messages/Internal.hs b/Messages/Internal.hs index 9b9edccc5..5c5b19bd1 100644 --- a/Messages/Internal.hs +++ b/Messages/Internal.hs @@ -12,25 +12,26 @@ import Annex import Types.Messages import Messages.Concurrent -withOutputType :: (OutputType -> Annex a) -> Annex a -withOutputType a = outputType <$> Annex.getState Annex.output >>= a +withMessageState :: (MessageState -> Annex a) -> Annex a +withMessageState a = Annex.getState Annex.output >>= a outputMessage :: IO () -> String -> Annex () -outputMessage json s = withOutputType go - where - go NormalOutput = liftIO $ - flushed $ putStr s - go QuietOutput = q - go o@(ConcurrentOutput {}) = concurrentMessage o False s q - go JSONOutput = liftIO $ flushed json +outputMessage json msg = withMessageState $ \s -> case outputType s of + NormalOutput + | concurrentOutputEnabled s -> concurrentMessage s False msg q + | otherwise -> liftIO $ flushed $ putStr msg + QuietOutput -> q + JSONOutput -> liftIO $ flushed json outputError :: String -> Annex () -outputError s = withOutputType go +outputError msg = withMessageState $ \s -> + if concurrentOutputEnabled s + then concurrentMessage s True msg go + else go where - go o@(ConcurrentOutput {}) = concurrentMessage o True s (go NormalOutput) - go _ = liftIO $ do + go = liftIO $ do hFlush stdout - hPutStr stderr s + hPutStr stderr msg hFlush stderr q :: Monad m => m () diff --git a/Messages/Progress.hs b/Messages/Progress.hs index f6541c191..fa11c1304 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -32,11 +32,11 @@ import Data.Quantity metered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a metered othermeter key a = case keySize key of Nothing -> nometer - Just size -> withOutputType (go $ fromInteger size) + Just size -> withMessageState (go $ fromInteger size) where - go _ QuietOutput = nometer - go _ JSONOutput = nometer - go size NormalOutput = do + go _ (MessageState { outputType = QuietOutput }) = nometer + go _ (MessageState { outputType = JSONOutput }) = nometer + go size (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do showOutput (progress, meter) <- mkmeter size m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $ \n -> do @@ -45,9 +45,9 @@ metered othermeter key a = case keySize key of r <- a (combinemeter m) liftIO $ clearMeter stdout meter return r + go size (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) = #if WITH_CONCURRENTOUTPUT - go size o@(ConcurrentOutput {}) - | concurrentOutputEnabled o = withProgressRegion $ \r -> do + withProgressRegion $ \r -> do (progress, meter) <- mkmeter size m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $ \n -> do setP progress $ fromBytesProcessed n @@ -55,9 +55,8 @@ metered othermeter key a = case keySize key of Regions.setConsoleRegion r ("\n" ++ s) a (combinemeter m) #else - go _size _o + nometer #endif - | otherwise = nometer mkmeter size = do progress <- liftIO $ newProgress "" size @@ -73,18 +72,18 @@ metered othermeter key a = case keySize key of {- Use when the progress meter is only desired for concurrent - output; as when a command's own progress output is preferred. -} concurrentMetered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a -concurrentMetered combinemeterupdate key a = withOutputType go - where - go (ConcurrentOutput {}) = metered combinemeterupdate key a - go _ = a (fromMaybe nullMeterUpdate combinemeterupdate) +concurrentMetered combinemeterupdate key a = + withMessageState $ \s -> if concurrentOutputEnabled s + then metered combinemeterupdate key a + else a (fromMaybe nullMeterUpdate combinemeterupdate) {- Poll file size to display meter, but only for concurrent output. -} concurrentMeteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a -concurrentMeteredFile file combinemeterupdate key a = withOutputType go - where - go (ConcurrentOutput {}) = metered combinemeterupdate key $ \p -> - watchFileSize file p a - go _ = a +concurrentMeteredFile file combinemeterupdate key a = + withMessageState $ \s -> if concurrentOutputEnabled s + then metered combinemeterupdate key $ \p -> + watchFileSize file p a + else a {- Progress dots. -} showProgressDots :: Annex () @@ -123,9 +122,9 @@ mkStderrRelayer = do - messing it up with interleaved stderr from a command. -} mkStderrEmitter :: Annex (String -> IO ()) -mkStderrEmitter = withOutputType go +mkStderrEmitter = withMessageState go where #ifdef WITH_CONCURRENTOUTPUT - go o | concurrentOutputEnabled o = return Console.errorConcurrent + go s | concurrentOutputEnabled s = return Console.errorConcurrent #endif go _ = return (hPutStrLn stderr) diff --git a/Types/Concurrency.hs b/Types/Concurrency.hs new file mode 100644 index 000000000..7fe5847d7 --- /dev/null +++ b/Types/Concurrency.hs @@ -0,0 +1,8 @@ +{- Copyright 2016 Joey Hess <id@joeyh.name> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Types.Concurrency where + +data Concurrency = NonConcurrent | Concurrent Int diff --git a/Types/Messages.hs b/Types/Messages.hs index 20c8051a0..597948426 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -15,7 +15,7 @@ import Data.Default import System.Console.Regions (ConsoleRegion) #endif -data OutputType = NormalOutput | QuietOutput | ConcurrentOutput Int Bool | JSONOutput +data OutputType = NormalOutput | QuietOutput | JSONOutput deriving (Show) data SideActionBlock = NoBlock | StartBlock | InBlock @@ -23,6 +23,7 @@ data SideActionBlock = NoBlock | StartBlock | InBlock data MessageState = MessageState { outputType :: OutputType + , concurrentOutputEnabled :: Bool , sideActionBlock :: SideActionBlock , implicitMessages :: Bool #ifdef WITH_CONCURRENTOUTPUT @@ -35,6 +36,7 @@ instance Default MessageState where def = MessageState { outputType = NormalOutput + , concurrentOutputEnabled = False , sideActionBlock = NoBlock , implicitMessages = True #ifdef WITH_CONCURRENTOUTPUT diff --git a/git-annex.cabal b/git-annex.cabal index 27593645b..c32da7e23 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -949,6 +949,7 @@ Executable git-annex Types.BranchState Types.CleanupActions Types.Command + Types.Concurrency Types.Creds Types.Crypto Types.DeferredParse |