aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2016-09-09 15:49:44 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2016-09-09 15:51:34 -0400
commitd1f722e3b0b74a9d9a2e35ac1f47dbca9f7cf606 (patch)
tree74314a3747d4a0456135bf38b2e1115b5a3f64b9
parent2408f5c6084aa04a09b36edcd264ce6bc7177c93 (diff)
better locking for json with -J
Avoid threads emitting json at the same time and scrambling, which was still possible even with the buffering, just less likely. Converted json IO actions to JSONChunk data too.
-rw-r--r--Command/Whereis.hs5
-rw-r--r--Messages.hs10
-rw-r--r--Messages/Internal.hs26
-rw-r--r--Messages/JSON.hs54
-rw-r--r--Messages/Progress.hs5
-rw-r--r--Types/Messages.hs5
6 files changed, 63 insertions, 42 deletions
diff --git a/Command/Whereis.hs b/Command/Whereis.hs
index 3a21e0a3b..bcc11aaf7 100644
--- a/Command/Whereis.hs
+++ b/Command/Whereis.hs
@@ -105,7 +105,6 @@ showRemoteUrls :: M.Map UUID Remote -> (UUID, [URLString]) -> Annex ()
showRemoteUrls remotemap (uu, us)
| null us = noop
| otherwise = case M.lookup uu remotemap of
- Just r -> do
- let ls = unlines $ map (\u -> name r ++ ": " ++ u) us
- outputMessage noop ('\n' : indent ls ++ "\n")
+ Just r -> showLongNote $
+ unlines $ map (\u -> name r ++ ": " ++ u) us
Nothing -> noop
diff --git a/Messages.hs b/Messages.hs
index 38b8ad890..53f356c1d 100644
--- a/Messages.hs
+++ b/Messages.hs
@@ -85,7 +85,7 @@ showSideAction m = Annex.getState Annex.output >>= go
Annex.changeState $ \s -> s { Annex.output = st' }
| sideActionBlock st == InBlock = return ()
| otherwise = p
- p = outputMessage q $ "(" ++ m ++ "...)\n"
+ p = outputMessage JSON.none $ "(" ++ m ++ "...)\n"
showStoringStateAction :: Annex ()
showStoringStateAction = showSideAction "recording state in git"
@@ -110,7 +110,7 @@ doSideAction' b a = do
{- Make way for subsequent output of a command. -}
showOutput :: Annex ()
showOutput = unlessM commandProgressDisabled $
- outputMessage q "\n"
+ outputMessage JSON.none "\n"
showLongNote :: String -> Annex ()
showLongNote s = outputMessage (JSON.note s) ('\n' : indent s ++ "\n")
@@ -140,7 +140,7 @@ earlyWarning = warning' False
warning' :: Bool -> String -> Annex ()
warning' makeway w = do
when makeway $
- outputMessage q "\n"
+ outputMessage JSON.none "\n"
outputError (w ++ "\n")
{- Not concurrent output safe. -}
@@ -173,10 +173,10 @@ showCustom command a = do
outputMessage (JSON.end r) ""
showHeader :: String -> Annex ()
-showHeader h = outputMessage q $ (h ++ ": ")
+showHeader h = outputMessage JSON.none $ (h ++ ": ")
showRaw :: String -> Annex ()
-showRaw s = outputMessage q (s ++ "\n")
+showRaw s = outputMessage JSON.none (s ++ "\n")
setupConsole :: IO ()
setupConsole = do
diff --git a/Messages/Internal.hs b/Messages/Internal.hs
index bf212b71b..2c9a461a5 100644
--- a/Messages/Internal.hs
+++ b/Messages/Internal.hs
@@ -11,17 +11,20 @@ import Common
import Annex
import Types.Messages
import Messages.Concurrent
+import Messages.JSON
+
+import qualified Data.ByteString.Lazy as B
withMessageState :: (MessageState -> Annex a) -> Annex a
withMessageState a = Annex.getState Annex.output >>= a
-outputMessage :: IO () -> String -> Annex ()
+outputMessage :: JSONChunk -> String -> Annex ()
outputMessage = outputMessage' False
-outputMessageFinal :: IO () -> String -> Annex ()
+outputMessageFinal :: JSONChunk -> String -> Annex ()
outputMessageFinal = outputMessage' True
-outputMessage' :: Bool -> IO () -> String -> Annex ()
+outputMessage' :: Bool -> JSONChunk -> String -> Annex ()
outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s of
NormalOutput
| concurrentOutputEnabled s -> concurrentMessage s False msg q
@@ -29,7 +32,7 @@ outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s
JSONOutput _ -> void $ outputJSON json endmessage s
QuietOutput -> q
-outputJSON :: IO () -> Bool -> MessageState -> Annex Bool
+outputJSON :: JSONChunk -> Bool -> MessageState -> Annex Bool
outputJSON json endmessage s = case outputType s of
JSONOutput withprogress
| withprogress || concurrentOutputEnabled s -> do
@@ -37,20 +40,17 @@ outputJSON json endmessage s = case outputType s of
if endmessage
then do
Annex.changeState $ \st ->
- st { Annex.output = s { jsonBuffer = [] } }
- liftIO $ flushed $ do
- showJSONBuffer s
- json
+ st { Annex.output = s { jsonBuffer = none } }
+ liftIO $ flushed $ emit b
else Annex.changeState $ \st ->
- st { Annex.output = s { jsonBuffer = json : jsonBuffer s } }
+ st { Annex.output = s { jsonBuffer = b } }
return True
| otherwise -> do
- liftIO $ flushed json
+ liftIO $ flushed $ emit json
return True
_ -> return False
-
-showJSONBuffer :: MessageState -> IO ()
-showJSONBuffer s = sequence_ $ reverse $ jsonBuffer s
+ where
+ b = jsonBuffer s `B.append` json
outputError :: String -> Annex ()
outputError msg = withMessageState $ \s ->
diff --git a/Messages/JSON.hs b/Messages/JSON.hs
index 7b94aa220..3baeaef3f 100644
--- a/Messages/JSON.hs
+++ b/Messages/JSON.hs
@@ -8,6 +8,9 @@
{-# LANGUAGE OverloadedStrings #-}
module Messages.JSON (
+ JSONChunk,
+ emit,
+ none,
start,
end,
note,
@@ -25,6 +28,8 @@ import qualified Data.Map as M
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as B
import System.IO
+import System.IO.Unsafe (unsafePerformIO)
+import Control.Concurrent
import Data.Maybe
import Data.Monoid
import Prelude
@@ -34,8 +39,24 @@ import Types.Key
import Utility.Metered
import Utility.Percentage
-start :: String -> Maybe FilePath -> Maybe Key -> IO ()
-start command file key = B.hPut stdout $ Stream.start $ Stream.AesonObject o
+type JSONChunk = B.ByteString
+
+-- A global lock to avoid concurrent threads emitting json at the same time.
+{-# NOINLINE emitLock #-}
+emitLock :: MVar ()
+emitLock = unsafePerformIO $ newMVar ()
+
+emit :: JSONChunk -> IO ()
+emit v = do
+ takeMVar emitLock
+ B.hPut stdout v
+ putMVar emitLock ()
+
+none :: JSONChunk
+none = B.empty
+
+start :: String -> Maybe FilePath -> Maybe Key -> JSONChunk
+start command file key = Stream.start $ Stream.AesonObject o
where
Object o = toJSON $ JSONActionItem
{ itemCommand = Just command
@@ -44,25 +65,26 @@ start command file key = B.hPut stdout $ Stream.start $ Stream.AesonObject o
, itemAdded = Nothing
}
-end :: Bool -> IO ()
-end b = B.hPut stdout $ Stream.add (Stream.JSONChunk [("success", b)]) `B.append` Stream.end
+end :: Bool -> JSONChunk
+end b =Stream.add (Stream.JSONChunk [("success", b)]) `B.append` Stream.end
-note :: String -> IO ()
+note :: String -> JSONChunk
note s = add (Stream.JSONChunk [("note", s)])
-add :: Stream.JSONChunk a -> IO ()
-add = B.hPut stdout . Stream.add
+add :: Stream.JSONChunk a -> JSONChunk
+add = Stream.add
-complete :: Stream.JSONChunk a -> IO ()
-complete v = B.hPut stdout $ Stream.start v `B.append` Stream.end
+complete :: Stream.JSONChunk a -> JSONChunk
+complete v = Stream.start v `B.append` Stream.end
-progress :: IO () -> Integer -> BytesProcessed -> IO ()
-progress jsonbuffer size bytesprocessed = do
- B.hPut stdout $ Stream.start $ Stream.AesonObject o
- putStr ",\"action\":"
- jsonbuffer
- B.hPut stdout $ Stream.end
- B.hPut stdout $ Stream.end
+progress :: B.ByteString -> Integer -> BytesProcessed -> IO ()
+progress jsonbuffer size bytesprocessed = emit $ B.concat
+ [ Stream.start $ Stream.AesonObject o
+ , ",\"action\":"
+ , jsonbuffer
+ , "}"
+ , Stream.end
+ ]
where
n = fromBytesProcessed bytesprocessed :: Integer
Object o = object
diff --git a/Messages/Progress.hs b/Messages/Progress.hs
index a48e7b07e..39709a211 100644
--- a/Messages/Progress.hs
+++ b/Messages/Progress.hs
@@ -11,7 +11,6 @@ module Messages.Progress where
import Common
import Messages
-import Messages.Internal
import Utility.Metered
import Types
import Types.Messages
@@ -59,7 +58,7 @@ metered othermeter key a = case keySize key of
#endif
go _ (MessageState { outputType = JSONOutput False }) = nometer
go size (MessageState { outputType = JSONOutput True }) = do
- buf <- withMessageState $ return . showJSONBuffer
+ buf <- withMessageState $ return . jsonBuffer
m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $
JSON.progress buf size
a (combinemeter m)
@@ -93,7 +92,7 @@ concurrentMeteredFile file combinemeterupdate key a =
{- Progress dots. -}
showProgressDots :: Annex ()
-showProgressDots = outputMessage q "."
+showProgressDots = outputMessage JSON.none "."
{- Runs a command, that may output progress to either stdout or
- stderr, as well as other messages.
diff --git a/Types/Messages.hs b/Types/Messages.hs
index a155cae2a..49242ea45 100644
--- a/Types/Messages.hs
+++ b/Types/Messages.hs
@@ -10,6 +10,7 @@
module Types.Messages where
import Data.Default
+import qualified Data.ByteString.Lazy as B
#ifdef WITH_CONCURRENTOUTPUT
import System.Console.Regions (ConsoleRegion)
@@ -30,7 +31,7 @@ data MessageState = MessageState
, consoleRegion :: Maybe ConsoleRegion
, consoleRegionErrFlag :: Bool
#endif
- , jsonBuffer :: [IO ()]
+ , jsonBuffer :: B.ByteString
}
instance Default MessageState
@@ -44,5 +45,5 @@ instance Default MessageState
, consoleRegion = Nothing
, consoleRegionErrFlag = False
#endif
- , jsonBuffer = []
+ , jsonBuffer = B.empty
}