diff options
author | Joey Hess <joeyh@joeyh.name> | 2016-09-09 15:49:44 -0400 |
---|---|---|
committer | Joey Hess <joeyh@joeyh.name> | 2016-09-09 15:51:34 -0400 |
commit | d1f722e3b0b74a9d9a2e35ac1f47dbca9f7cf606 (patch) | |
tree | 74314a3747d4a0456135bf38b2e1115b5a3f64b9 | |
parent | 2408f5c6084aa04a09b36edcd264ce6bc7177c93 (diff) |
better locking for json with -J
Avoid threads emitting json at the same time and scrambling, which was
still possible even with the buffering, just less likely.
Converted json IO actions to JSONChunk data too.
-rw-r--r-- | Command/Whereis.hs | 5 | ||||
-rw-r--r-- | Messages.hs | 10 | ||||
-rw-r--r-- | Messages/Internal.hs | 26 | ||||
-rw-r--r-- | Messages/JSON.hs | 54 | ||||
-rw-r--r-- | Messages/Progress.hs | 5 | ||||
-rw-r--r-- | Types/Messages.hs | 5 |
6 files changed, 63 insertions, 42 deletions
diff --git a/Command/Whereis.hs b/Command/Whereis.hs index 3a21e0a3b..bcc11aaf7 100644 --- a/Command/Whereis.hs +++ b/Command/Whereis.hs @@ -105,7 +105,6 @@ showRemoteUrls :: M.Map UUID Remote -> (UUID, [URLString]) -> Annex () showRemoteUrls remotemap (uu, us) | null us = noop | otherwise = case M.lookup uu remotemap of - Just r -> do - let ls = unlines $ map (\u -> name r ++ ": " ++ u) us - outputMessage noop ('\n' : indent ls ++ "\n") + Just r -> showLongNote $ + unlines $ map (\u -> name r ++ ": " ++ u) us Nothing -> noop diff --git a/Messages.hs b/Messages.hs index 38b8ad890..53f356c1d 100644 --- a/Messages.hs +++ b/Messages.hs @@ -85,7 +85,7 @@ showSideAction m = Annex.getState Annex.output >>= go Annex.changeState $ \s -> s { Annex.output = st' } | sideActionBlock st == InBlock = return () | otherwise = p - p = outputMessage q $ "(" ++ m ++ "...)\n" + p = outputMessage JSON.none $ "(" ++ m ++ "...)\n" showStoringStateAction :: Annex () showStoringStateAction = showSideAction "recording state in git" @@ -110,7 +110,7 @@ doSideAction' b a = do {- Make way for subsequent output of a command. -} showOutput :: Annex () showOutput = unlessM commandProgressDisabled $ - outputMessage q "\n" + outputMessage JSON.none "\n" showLongNote :: String -> Annex () showLongNote s = outputMessage (JSON.note s) ('\n' : indent s ++ "\n") @@ -140,7 +140,7 @@ earlyWarning = warning' False warning' :: Bool -> String -> Annex () warning' makeway w = do when makeway $ - outputMessage q "\n" + outputMessage JSON.none "\n" outputError (w ++ "\n") {- Not concurrent output safe. -} @@ -173,10 +173,10 @@ showCustom command a = do outputMessage (JSON.end r) "" showHeader :: String -> Annex () -showHeader h = outputMessage q $ (h ++ ": ") +showHeader h = outputMessage JSON.none $ (h ++ ": ") showRaw :: String -> Annex () -showRaw s = outputMessage q (s ++ "\n") +showRaw s = outputMessage JSON.none (s ++ "\n") setupConsole :: IO () setupConsole = do diff --git a/Messages/Internal.hs b/Messages/Internal.hs index bf212b71b..2c9a461a5 100644 --- a/Messages/Internal.hs +++ b/Messages/Internal.hs @@ -11,17 +11,20 @@ import Common import Annex import Types.Messages import Messages.Concurrent +import Messages.JSON + +import qualified Data.ByteString.Lazy as B withMessageState :: (MessageState -> Annex a) -> Annex a withMessageState a = Annex.getState Annex.output >>= a -outputMessage :: IO () -> String -> Annex () +outputMessage :: JSONChunk -> String -> Annex () outputMessage = outputMessage' False -outputMessageFinal :: IO () -> String -> Annex () +outputMessageFinal :: JSONChunk -> String -> Annex () outputMessageFinal = outputMessage' True -outputMessage' :: Bool -> IO () -> String -> Annex () +outputMessage' :: Bool -> JSONChunk -> String -> Annex () outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s of NormalOutput | concurrentOutputEnabled s -> concurrentMessage s False msg q @@ -29,7 +32,7 @@ outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s JSONOutput _ -> void $ outputJSON json endmessage s QuietOutput -> q -outputJSON :: IO () -> Bool -> MessageState -> Annex Bool +outputJSON :: JSONChunk -> Bool -> MessageState -> Annex Bool outputJSON json endmessage s = case outputType s of JSONOutput withprogress | withprogress || concurrentOutputEnabled s -> do @@ -37,20 +40,17 @@ outputJSON json endmessage s = case outputType s of if endmessage then do Annex.changeState $ \st -> - st { Annex.output = s { jsonBuffer = [] } } - liftIO $ flushed $ do - showJSONBuffer s - json + st { Annex.output = s { jsonBuffer = none } } + liftIO $ flushed $ emit b else Annex.changeState $ \st -> - st { Annex.output = s { jsonBuffer = json : jsonBuffer s } } + st { Annex.output = s { jsonBuffer = b } } return True | otherwise -> do - liftIO $ flushed json + liftIO $ flushed $ emit json return True _ -> return False - -showJSONBuffer :: MessageState -> IO () -showJSONBuffer s = sequence_ $ reverse $ jsonBuffer s + where + b = jsonBuffer s `B.append` json outputError :: String -> Annex () outputError msg = withMessageState $ \s -> diff --git a/Messages/JSON.hs b/Messages/JSON.hs index 7b94aa220..3baeaef3f 100644 --- a/Messages/JSON.hs +++ b/Messages/JSON.hs @@ -8,6 +8,9 @@ {-# LANGUAGE OverloadedStrings #-} module Messages.JSON ( + JSONChunk, + emit, + none, start, end, note, @@ -25,6 +28,8 @@ import qualified Data.Map as M import qualified Data.Text as T import qualified Data.ByteString.Lazy as B import System.IO +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent import Data.Maybe import Data.Monoid import Prelude @@ -34,8 +39,24 @@ import Types.Key import Utility.Metered import Utility.Percentage -start :: String -> Maybe FilePath -> Maybe Key -> IO () -start command file key = B.hPut stdout $ Stream.start $ Stream.AesonObject o +type JSONChunk = B.ByteString + +-- A global lock to avoid concurrent threads emitting json at the same time. +{-# NOINLINE emitLock #-} +emitLock :: MVar () +emitLock = unsafePerformIO $ newMVar () + +emit :: JSONChunk -> IO () +emit v = do + takeMVar emitLock + B.hPut stdout v + putMVar emitLock () + +none :: JSONChunk +none = B.empty + +start :: String -> Maybe FilePath -> Maybe Key -> JSONChunk +start command file key = Stream.start $ Stream.AesonObject o where Object o = toJSON $ JSONActionItem { itemCommand = Just command @@ -44,25 +65,26 @@ start command file key = B.hPut stdout $ Stream.start $ Stream.AesonObject o , itemAdded = Nothing } -end :: Bool -> IO () -end b = B.hPut stdout $ Stream.add (Stream.JSONChunk [("success", b)]) `B.append` Stream.end +end :: Bool -> JSONChunk +end b =Stream.add (Stream.JSONChunk [("success", b)]) `B.append` Stream.end -note :: String -> IO () +note :: String -> JSONChunk note s = add (Stream.JSONChunk [("note", s)]) -add :: Stream.JSONChunk a -> IO () -add = B.hPut stdout . Stream.add +add :: Stream.JSONChunk a -> JSONChunk +add = Stream.add -complete :: Stream.JSONChunk a -> IO () -complete v = B.hPut stdout $ Stream.start v `B.append` Stream.end +complete :: Stream.JSONChunk a -> JSONChunk +complete v = Stream.start v `B.append` Stream.end -progress :: IO () -> Integer -> BytesProcessed -> IO () -progress jsonbuffer size bytesprocessed = do - B.hPut stdout $ Stream.start $ Stream.AesonObject o - putStr ",\"action\":" - jsonbuffer - B.hPut stdout $ Stream.end - B.hPut stdout $ Stream.end +progress :: B.ByteString -> Integer -> BytesProcessed -> IO () +progress jsonbuffer size bytesprocessed = emit $ B.concat + [ Stream.start $ Stream.AesonObject o + , ",\"action\":" + , jsonbuffer + , "}" + , Stream.end + ] where n = fromBytesProcessed bytesprocessed :: Integer Object o = object diff --git a/Messages/Progress.hs b/Messages/Progress.hs index a48e7b07e..39709a211 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -11,7 +11,6 @@ module Messages.Progress where import Common import Messages -import Messages.Internal import Utility.Metered import Types import Types.Messages @@ -59,7 +58,7 @@ metered othermeter key a = case keySize key of #endif go _ (MessageState { outputType = JSONOutput False }) = nometer go size (MessageState { outputType = JSONOutput True }) = do - buf <- withMessageState $ return . showJSONBuffer + buf <- withMessageState $ return . jsonBuffer m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $ JSON.progress buf size a (combinemeter m) @@ -93,7 +92,7 @@ concurrentMeteredFile file combinemeterupdate key a = {- Progress dots. -} showProgressDots :: Annex () -showProgressDots = outputMessage q "." +showProgressDots = outputMessage JSON.none "." {- Runs a command, that may output progress to either stdout or - stderr, as well as other messages. diff --git a/Types/Messages.hs b/Types/Messages.hs index a155cae2a..49242ea45 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -10,6 +10,7 @@ module Types.Messages where import Data.Default +import qualified Data.ByteString.Lazy as B #ifdef WITH_CONCURRENTOUTPUT import System.Console.Regions (ConsoleRegion) @@ -30,7 +31,7 @@ data MessageState = MessageState , consoleRegion :: Maybe ConsoleRegion , consoleRegionErrFlag :: Bool #endif - , jsonBuffer :: [IO ()] + , jsonBuffer :: B.ByteString } instance Default MessageState @@ -44,5 +45,5 @@ instance Default MessageState , consoleRegion = Nothing , consoleRegionErrFlag = False #endif - , jsonBuffer = [] + , jsonBuffer = B.empty } |